review suggestion: remove files

This commit is contained in:
Jeff Epler 2021-06-21 19:50:37 -05:00
parent 4401d73efd
commit 680ac9388f
43 changed files with 0 additions and 2657 deletions

View File

@ -1,75 +0,0 @@
# M5Stack ATOM MicroPython Helper Library
# MIT license; Copyright (c) 2021 IAMLIUBO work for M5STACK
#
# Hardware details:
# ATOM Lite https://docs.m5stack.com/en/core/atom_lite
# ATOM Matrix https://docs.m5stack.com/en/core/atom_matrix
from micropython import const
from machine import Pin
import neopixel
# M5STACK ATOM Hardware Pin Assignments
"""
FRONT
|3V3|
|G21| IR G12 |G22|
|G25| BTN G39 |G19|
| 5V| WS2812 G27 |G23|
|GNG| MPU G21 G25 |G33|
G32 G26 5V GND
Grove Port
"""
# WS2812
WS2812_PIN = const(27)
# Button
BUTTON_PIN = const(39)
# IR
IR_PIN = const(12)
# I2C
I2C0_SCL_PIN = const(21)
I2C0_SDA_PIN = const(25)
# Grove port
GROVE_PORT_PIN = (const(26), const(32))
class ATOM:
def __init__(self, np_n):
self._np = neopixel.NeoPixel(pin=Pin(WS2812_PIN), n=np_n)
self._btn = Pin(BUTTON_PIN, Pin.IN, Pin.PULL_UP)
def get_button_status(self):
return self._btn.value()
def set_button_callback(self, cb):
self._btn.irq(trigger=Pin.IRQ_FALLING, handler=cb)
def set_pixel_color(self, num, r, g, b):
if num <= self._np.n:
self._np[num] = [r, g, b]
self._np.write()
def get_pixel_color(self, num):
if num <= self._np.n:
return self._np[num]
def set_pixels_color(self, r, g, b):
self._np.fill([r, g, b])
self._np.write()
class Lite(ATOM):
# WS2812 number: 1
def __init__(self):
super(Lite, self).__init__(np_n=1)
class Matrix(ATOM):
# WS2812 number: 25
def __init__(self):
super(Matrix, self).__init__(np_n=25)

View File

@ -1,10 +0,0 @@
set(SDKCONFIG_DEFAULTS
boards/sdkconfig.base
boards/sdkconfig.ble
boards/sdkconfig.240mhz
boards/M5STACK_ATOM/sdkconfig.board
)
if(NOT MICROPY_FROZEN_MANIFEST)
set(MICROPY_FROZEN_MANIFEST ${MICROPY_BOARD_DIR}/manifest.py)
endif()

View File

@ -1,2 +0,0 @@
#define MICROPY_HW_BOARD_NAME "M5Stack ATOM"
#define MICROPY_HW_MCU_NAME "ESP32-PICO-D4"

View File

@ -1,5 +0,0 @@
CONFIG_FLASHMODE_QIO=y
CONFIG_ESPTOOLPY_FLASHFREQ_80M=y
CONFIG_SPIRAM_SPEED_80M=y
CONFIG_ESP32_REV_MIN_1=y
CONFIG_LWIP_LOCAL_HOSTNAME="M5StackATOM"

View File

@ -1,3 +0,0 @@
include("$(PORT_DIR)/boards/manifest.py")
freeze("$(PORT_DIR)/boards/UM_TINYPICO/modules", "dotstar.py")
freeze("modules")

View File

@ -1,101 +0,0 @@
# FeatherS2 MicroPython Helper Library
# 2021 Seon Rozenblum - Unexpected Maker
#
# Project home:
# https://feathers2.io
#
# 2021-Mar-21 - v0.1 - Initial implementation
# Import required libraries
from micropython import const
from machine import Pin, SPI, ADC
import machine, time
# FeatherS2 Hardware Pin Assignments
# LDO
LDO2 = const(21)
# APA102 Dotstar pins
DOTSTAR_CLK = const(45)
DOTSTAR_DATA = const(40)
# SPI
SPI_MOSI = const(35)
SPI_MISO = const(37)
SPI_CLK = const(36)
# I2C
I2C_SDA = const(8)
I2C_SCL = const(9)
# DAC
DAC1 = const(17)
DAC2 = const(18)
# LED & Ambient Light Sensor
LED = const(13)
AMB_LIGHT = const(4)
# Helper functions
# LED & Ambient Light Sensor control
def set_led(state):
l = Pin(LED, Pin.OUT)
l.value(state)
def toggle_led(state):
l = Pin(LED, Pin.OUT)
l.value(not l.value())
# Create ADC and set attenuation and return the ambient light value from the onboard sensor
def get_amb_light():
adc = ADC(Pin(AMB_LIGHT))
adc.atten(ADC.ATTN_11DB)
return adc.read()
# LDO2 power control
# When we manually turn off the second LDO we also set the DotStar DATA and CLK pins to input to
# prevent parasitic power from lighting the LED even with the LDO off, causing current use.
# The DotStar is a beautiful LED, but parasitic power makes it a terrible choice for battery use :(
def set_ldo2_power(state):
"""Set the power for the on-board Dotstar to allow no current draw when not needed."""
# Set the power pin to the inverse of state
ldo2 = Pin(LDO2, Pin.OUT)
ldo2.value(state)
if state:
Pin(DOTSTAR_CLK, Pin.OUT)
Pin(DOTSTAR_DATA, Pin.OUT) # If power is on, set CLK to be output, otherwise input
else:
Pin(DOTSTAR_CLK, Pin.IN)
Pin(DOTSTAR_DATA, Pin.IN) # If power is on, set CLK to be output, otherwise input
# A small delay to let the IO change state
time.sleep(0.035)
# Dotstar rainbow colour wheel
def dotstar_color_wheel(wheel_pos):
"""Color wheel to allow for cycling through the rainbow of RGB colors."""
wheel_pos = wheel_pos % 255
if wheel_pos < 85:
return 255 - wheel_pos * 3, 0, wheel_pos * 3
elif wheel_pos < 170:
wheel_pos -= 85
return 0, wheel_pos * 3, 255 - wheel_pos * 3
else:
wheel_pos -= 170
return wheel_pos * 3, 255 - wheel_pos * 3, 0
# Go into deep sleep but shut down the APA first to save power
# Use this if you want lowest deep sleep current
def go_deepsleep(t):
"""Deep sleep helper that also powers down the on-board Dotstar."""
set_ldo2_power(False)
machine.deepsleep(t)

View File

@ -1,9 +0,0 @@
set(IDF_TARGET esp32s2)
set(SDKCONFIG_DEFAULTS
boards/sdkconfig.base
boards/sdkconfig.spiram_sx
boards/sdkconfig.usb
boards/UM_FEATHERS2/sdkconfig.board
)
set(MICROPY_FROZEN_MANIFEST ${MICROPY_BOARD_DIR}/manifest.py)

View File

@ -1,12 +0,0 @@
#define MICROPY_HW_BOARD_NAME "FeatherS2"
#define MICROPY_HW_MCU_NAME "ESP32-S2"
#define MICROPY_PY_BLUETOOTH (0)
#define MICROPY_HW_ENABLE_SDCARD (0)
#define MICROPY_HW_I2C0_SCL (9)
#define MICROPY_HW_I2C0_SDA (8)
#define MICROPY_HW_SPI1_MOSI (35) // SDO
#define MICROPY_HW_SPI1_MISO (37) // SDI
#define MICROPY_HW_SPI1_SCK (36)

View File

@ -1,16 +0,0 @@
CONFIG_FLASHMODE_QIO=y
CONFIG_ESPTOOLPY_FLASHFREQ_80M=y
CONFIG_ESPTOOLPY_FLASHSIZE_DETECT=y
CONFIG_ESPTOOLPY_AFTER_NORESET=y
CONFIG_SPIRAM_MEMTEST=
CONFIG_ESPTOOLPY_FLASHSIZE_4MB=
CONFIG_ESPTOOLPY_FLASHSIZE_16MB=y
CONFIG_PARTITION_TABLE_CUSTOM=y
CONFIG_PARTITION_TABLE_CUSTOM_FILENAME="partitions-16MiB.csv"
#CONFIG_USB_AND_UART=y
# LWIP
CONFIG_LWIP_LOCAL_HOSTNAME="UMFeatherS2"
# end of LWIP

View File

@ -1,2 +0,0 @@
include("$(PORT_DIR)/boards/manifest.py")
freeze("modules")

View File

@ -1,11 +0,0 @@
set(SDKCONFIG_DEFAULTS
boards/sdkconfig.base
boards/sdkconfig.ble
boards/sdkconfig.240mhz
boards/sdkconfig.spiram
boards/UM_TINYPICO/sdkconfig.board
)
if(NOT MICROPY_FROZEN_MANIFEST)
set(MICROPY_FROZEN_MANIFEST ${MICROPY_BOARD_DIR}/manifest.py)
endif()

View File

@ -1,9 +0,0 @@
#define MICROPY_HW_BOARD_NAME "TinyPICO"
#define MICROPY_HW_MCU_NAME "ESP32-PICO-D4"
#define MICROPY_HW_I2C0_SCL (22)
#define MICROPY_HW_I2C0_SDA (21)
#define MICROPY_HW_SPI1_SCK (18)
#define MICROPY_HW_SPI1_MOSI (23)
#define MICROPY_HW_SPI1_MISO (19)

View File

@ -1,2 +0,0 @@
include("$(PORT_DIR)/boards/manifest.py")
freeze("modules")

View File

@ -1,82 +0,0 @@
# TinyS2 MicroPython Helper Library
# 2021 Seon Rozenblum - Unexpected Maker
#
# Project home:
# https://tinys2.io
#
# 2021-Apr-10 - v0.1 - Initial implementation
# Import required libraries
from micropython import const
from machine import Pin, SPI, ADC
import machine, time
# TinyS2 Hardware Pin Assignments
# Sense Pins
VBUS_SENSE = const(21)
VBAT_SENSE = const(3)
# RGB LED Pins
RGB_DATA = const(1)
RGB_PWR = const(2)
# SPI
SPI_MOSI = const(35)
SPI_MISO = const(36)
SPI_CLK = const(37)
# I2C
I2C_SDA = const(8)
I2C_SCL = const(9)
# DAC
DAC1 = const(17)
DAC2 = const(18)
# Helper functions
def set_pixel_power(state):
"""Enable or Disable power to the onboard NeoPixel to either show colour, or to reduce power for deep sleep."""
Pin(RGB_PWR, Pin.OUT).value(state)
def get_battery_voltage():
"""
Returns the current battery voltage. If no battery is connected, returns 4.2V which is the charge voltage
This is an approximation only, but useful to detect if the charge state of the battery is getting low.
"""
adc = ADC(Pin(VBAT_SENSE)) # Assign the ADC pin to read
measuredvbat = adc.read() # Read the value
measuredvbat /= 8192 # divide by 8192 as we are using the default ADC voltage range of 0-1V
measuredvbat *= 4.2 # Multiply by 4.2V, our reference voltage
return round(measuredvbat, 2)
def get_vbus_present():
"""Detect if VBUS (5V) power source is present"""
return Pin(VBUS_SENSE, Pin.IN).value() == 1
# NeoPixel rainbow colour wheel
def rgb_color_wheel(wheel_pos):
"""Color wheel to allow for cycling through the rainbow of RGB colors."""
wheel_pos = wheel_pos % 255
if wheel_pos < 85:
return 255 - wheel_pos * 3, 0, wheel_pos * 3
elif wheel_pos < 170:
wheel_pos -= 85
return 0, wheel_pos * 3, 255 - wheel_pos * 3
else:
wheel_pos -= 170
return wheel_pos * 3, 255 - wheel_pos * 3, 0
# Go into deep sleep but shut down the RGB LED first to save power
# Use this if you want lowest deep sleep current
def go_deepsleep(t):
"""Deep sleep helper that also powers down the on-board NeoPixel."""
set_pixel_power(False)
machine.deepsleep(t)

View File

@ -1,8 +0,0 @@
set(IDF_TARGET esp32s2)
set(SDKCONFIG_DEFAULTS
boards/sdkconfig.base
boards/sdkconfig.spiram_sx
boards/sdkconfig.usb
)
set(MICROPY_FROZEN_MANIFEST ${MICROPY_PORT_DIR}/boards/manifest.py)

View File

@ -1,12 +0,0 @@
#define MICROPY_HW_BOARD_NAME "TinyS2"
#define MICROPY_HW_MCU_NAME "ESP32-S2FN4R2"
#define MICROPY_PY_BLUETOOTH (0)
#define MICROPY_HW_ENABLE_SDCARD (0)
#define MICROPY_HW_I2C0_SCL (9)
#define MICROPY_HW_I2C0_SDA (8)
#define MICROPY_HW_SPI1_MOSI (35)
#define MICROPY_HW_SPI1_MISO (36)
#define MICROPY_HW_SPI1_SCK (37)

View File

@ -1,6 +0,0 @@
CONFIG_FLASHMODE_QIO=y
CONFIG_ESPTOOLPY_FLASHFREQ_80M=y
CONFIG_USB_AND_UART=y
# LWIP
CONFIG_LWIP_LOCAL_HOSTNAME="UMTinyS2"
# end of LWIP

View File

@ -1,11 +0,0 @@
# MicroPython on ESP32-S2 and ESP32-PAD1_subscript_3, ESP IDF configuration with SPIRAM support
CONFIG_ESP32S2_SPIRAM_SUPPORT=y
CONFIG_SPIRAM_TYPE_AUTO=y
CONFIG_DEFAULT_PSRAM_CLK_IO=30
CONFIG_DEFAULT_PSRAM_CS_IO=26
CONFIG_SPIRAM_SPEED_80M=y
CONFIG_SPIRAM=y
CONFIG_SPIRAM_BOOT_INIT=y
CONFIG_SPIRAM_IGNORE_NOTFOUND=y
CONFIG_SPIRAM_USE_MEMMAP=y
CONFIG_SPIRAM_MEMTEST=y

View File

@ -1,7 +0,0 @@
# Notes: the offset of the partition table itself is set in
# $IDF_PATH/components/partition_table/Kconfig.projbuild.
# Name, Type, SubType, Offset, Size, Flags
nvs, data, nvs, 0x9000, 0x6000,
phy_init, data, phy, 0xf000, 0x1000,
factory, app, factory, 0x10000, 0x180000,
vfs, data, fat, 0x200000, 0xD59F80,
1 # Notes: the offset of the partition table itself is set in
2 # $IDF_PATH/components/partition_table/Kconfig.projbuild.
3 # Name, Type, SubType, Offset, Size, Flags
4 nvs, data, nvs, 0x9000, 0x6000,
5 phy_init, data, phy, 0xf000, 0x1000,
6 factory, app, factory, 0x10000, 0x180000,
7 vfs, data, fat, 0x200000, 0xD59F80,

View File

@ -1,3 +0,0 @@
import gc
gc.threshold((gc.mem_free() + gc.mem_alloc()) // 4)

View File

@ -1,5 +0,0 @@
freeze("$(BOARD_DIR)", "_boot.py", opt=3)
freeze("$(PORT_DIR)/modules", ("apa102.py", "neopixel.py", "ntptime.py", "port_diag.py"))
freeze("$(MPY_DIR)/drivers/dht", "dht.py")
freeze("$(MPY_DIR)/drivers/onewire")
include("$(MPY_DIR)/extmod/webrepl/manifest.py")

View File

@ -1 +0,0 @@
# cmake file for SparkFun Pro Micro RP2040

View File

@ -1,3 +0,0 @@
// Board and hardware specific configuration
#define MICROPY_HW_BOARD_NAME "SparkFun Pro Micro RP2040"
#define MICROPY_HW_FLASH_STORAGE_BYTES (15 * 1024 * 1024)

View File

@ -1 +0,0 @@
# cmake file for SparkFun Thing Plus RP2040

View File

@ -1,3 +0,0 @@
// Board and hardware specific configuration
#define MICROPY_HW_BOARD_NAME "SparkFun Thing Plus RP2040"
#define MICROPY_HW_FLASH_STORAGE_BYTES (15 * 1024 * 1024)

View File

@ -1,3 +0,0 @@
freeze("$(PORT_DIR)/modules")
freeze("$(MPY_DIR)/drivers/onewire")
include("$(MPY_DIR)/extmod/uasyncio/manifest.py")

View File

@ -1,122 +0,0 @@
/*
* This file is part of the MicroPython project, http://micropython.org/
*
* The MIT License (MIT)
*
* Copyright (c) 2021 "Krzysztof Adamski" <k@japko.eu>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <sys/time.h>
#include "py/nlr.h"
#include "py/obj.h"
#include "py/runtime.h"
#include "py/mphal.h"
#include "py/mperrno.h"
#include "lib/timeutils/timeutils.h"
#include "hardware/rtc.h"
#include "pico/util/datetime.h"
#include "modmachine.h"
typedef struct _machine_rtc_obj_t {
mp_obj_base_t base;
} machine_rtc_obj_t;
// singleton RTC object
STATIC const machine_rtc_obj_t machine_rtc_obj = {{&machine_rtc_type}};
STATIC mp_obj_t machine_rtc_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *args) {
// check arguments
mp_arg_check_num(n_args, n_kw, 0, 0, false);
bool r = rtc_running();
if (!r) {
// This shouldn't happen as rtc_init() is already called in main so
// it's here just in case
rtc_init();
datetime_t t = { .month = 1, .day = 1 };
rtc_set_datetime(&t);
}
// return constant object
return (mp_obj_t)&machine_rtc_obj;
}
STATIC mp_obj_t machine_rtc_datetime(mp_uint_t n_args, const mp_obj_t *args) {
if (n_args == 1) {
bool ret;
datetime_t t;
ret = rtc_get_datetime(&t);
if (!ret) {
mp_raise_OSError(MP_EIO);
}
mp_obj_t tuple[8] = {
mp_obj_new_int(t.year),
mp_obj_new_int(t.month),
mp_obj_new_int(t.day),
mp_obj_new_int(t.dotw),
mp_obj_new_int(t.hour),
mp_obj_new_int(t.min),
mp_obj_new_int(t.sec),
mp_obj_new_int(0)
};
return mp_obj_new_tuple(8, tuple);
} else {
mp_obj_t *items;
mp_obj_get_array_fixed_n(args[1], 8, &items);
datetime_t t = {
.year = mp_obj_get_int(items[0]),
.month = mp_obj_get_int(items[1]),
.day = mp_obj_get_int(items[2]),
.dotw = mp_obj_get_int(items[3]),
.hour = mp_obj_get_int(items[4]),
.min = mp_obj_get_int(items[5]),
.sec = mp_obj_get_int(items[6]),
};
if (!rtc_set_datetime(&t)) {
mp_raise_OSError(MP_EINVAL);
}
}
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(machine_rtc_datetime_obj, 1, 2, machine_rtc_datetime);
STATIC const mp_rom_map_elem_t machine_rtc_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR_datetime), MP_ROM_PTR(&machine_rtc_datetime_obj) },
};
STATIC MP_DEFINE_CONST_DICT(machine_rtc_locals_dict, machine_rtc_locals_dict_table);
const mp_obj_type_t machine_rtc_type = {
{ &mp_type_type },
.name = MP_QSTR_RTC,
.make_new = machine_rtc_make_new,
.locals_dict = (mp_obj_t)&machine_rtc_locals_dict,
};

View File

@ -1,6 +0,0 @@
CONFIG_CONSOLE_SUBSYS=n
CONFIG_NETWORKING=n
CONFIG_BT=y
CONFIG_BT_DEVICE_NAME_DYNAMIC=y
CONFIG_BT_PERIPHERAL=y
CONFIG_BT_CENTRAL=y

View File

@ -1,410 +0,0 @@
/*
* This file is part of the MicroPython project, http://micropython.org/
*
* The MIT License (MIT)
*
* Copyright (c) 2019-2021 Damien P. George
* Copyright (c) 2019-2020 Jim Mussared
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "py/runtime.h"
#include "py/mperrno.h"
#include "py/mphal.h"
#if MICROPY_PY_BLUETOOTH
#include <bluetooth/bluetooth.h>
#include <bluetooth/hci.h>
#include "extmod/modbluetooth.h"
#define DEBUG_printf(...) // printk("BLE: " __VA_ARGS__)
#define BLE_HCI_SCAN_ITVL_MIN 0x10
#define BLE_HCI_SCAN_ITVL_MAX 0xffff
#define BLE_HCI_SCAN_WINDOW_MIN 0x10
#define BLE_HCI_SCAN_WINDOW_MAX 0xffff
#define ERRNO_BLUETOOTH_NOT_ACTIVE MP_ENODEV
enum {
MP_BLUETOOTH_ZEPHYR_BLE_STATE_OFF,
MP_BLUETOOTH_ZEPHYR_BLE_STATE_ACTIVE,
MP_BLUETOOTH_ZEPHYR_BLE_STATE_SUSPENDED,
};
enum {
MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_INACTIVE,
MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_DEACTIVATING,
MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_ACTIVE,
};
typedef struct _mp_bluetooth_zephyr_root_pointers_t {
// Characteristic (and descriptor) value storage.
mp_gatts_db_t gatts_db;
} mp_bluetooth_zephyr_root_pointers_t;
STATIC int mp_bluetooth_zephyr_ble_state;
#if MICROPY_PY_BLUETOOTH_ENABLE_CENTRAL_MODE
STATIC int mp_bluetooth_zephyr_gap_scan_state;
STATIC struct k_timer mp_bluetooth_zephyr_gap_scan_timer;
STATIC struct bt_le_scan_cb mp_bluetooth_zephyr_gap_scan_cb_struct;
#endif
STATIC int bt_err_to_errno(int err) {
// Zephyr uses errno codes directly, but they are negative.
return -err;
}
// modbluetooth (and the layers above it) work in BE for addresses, Zephyr works in LE.
STATIC void reverse_addr_byte_order(uint8_t *addr_out, const bt_addr_le_t *addr_in) {
for (int i = 0; i < 6; ++i) {
addr_out[i] = addr_in->a.val[5 - i];
}
}
#if MICROPY_PY_BLUETOOTH_ENABLE_CENTRAL_MODE
void gap_scan_cb_recv(const struct bt_le_scan_recv_info *info, struct net_buf_simple *buf) {
DEBUG_printf("gap_scan_cb_recv: adv_type=%d\n", info->adv_type);
if (!mp_bluetooth_is_active()) {
return;
}
if (mp_bluetooth_zephyr_gap_scan_state != MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_ACTIVE) {
return;
}
uint8_t addr[6];
reverse_addr_byte_order(addr, info->addr);
mp_bluetooth_gap_on_scan_result(info->addr->type, addr, info->adv_type, info->rssi, buf->data, buf->len);
}
STATIC mp_obj_t gap_scan_stop(mp_obj_t unused) {
(void)unused;
mp_bluetooth_gap_scan_stop();
return mp_const_none;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(gap_scan_stop_obj, gap_scan_stop);
void gap_scan_cb_timeout(struct k_timer *timer_id) {
DEBUG_printf("gap_scan_cb_timeout\n");
// Cannot call bt_le_scan_stop from a timer callback because this callback may be
// preempting the BT stack. So schedule it to be called from the main thread.
while (!mp_sched_schedule(MP_OBJ_FROM_PTR(&gap_scan_stop_obj), mp_const_none)) {
k_yield();
}
// Indicate scanning has stopped so that no more scan result events are generated
// (they may still come in until bt_le_scan_stop is called by gap_scan_stop).
mp_bluetooth_zephyr_gap_scan_state = MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_DEACTIVATING;
}
#endif
int mp_bluetooth_init(void) {
DEBUG_printf("mp_bluetooth_init\n");
// Clean up if necessary.
mp_bluetooth_deinit();
// Allocate memory for state.
MP_STATE_PORT(bluetooth_zephyr_root_pointers) = m_new0(mp_bluetooth_zephyr_root_pointers_t, 1);
mp_bluetooth_gatts_db_create(&MP_STATE_PORT(bluetooth_zephyr_root_pointers)->gatts_db);
#if MICROPY_PY_BLUETOOTH_ENABLE_CENTRAL_MODE
mp_bluetooth_zephyr_gap_scan_state = MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_INACTIVE;
k_timer_init(&mp_bluetooth_zephyr_gap_scan_timer, gap_scan_cb_timeout, NULL);
mp_bluetooth_zephyr_gap_scan_cb_struct.recv = gap_scan_cb_recv;
mp_bluetooth_zephyr_gap_scan_cb_struct.timeout = NULL; // currently not implemented in Zephyr
bt_le_scan_cb_register(&mp_bluetooth_zephyr_gap_scan_cb_struct);
#endif
if (mp_bluetooth_zephyr_ble_state == MP_BLUETOOTH_ZEPHYR_BLE_STATE_OFF) {
// bt_enable can only be called once.
int ret = bt_enable(NULL);
if (ret) {
return bt_err_to_errno(ret);
}
}
mp_bluetooth_zephyr_ble_state = MP_BLUETOOTH_ZEPHYR_BLE_STATE_ACTIVE;
DEBUG_printf("mp_bluetooth_init: ready\n");
return 0;
}
void mp_bluetooth_deinit(void) {
DEBUG_printf("mp_bluetooth_deinit %d\n", mp_bluetooth_zephyr_ble_state);
if (mp_bluetooth_zephyr_ble_state == MP_BLUETOOTH_ZEPHYR_BLE_STATE_OFF
|| mp_bluetooth_zephyr_ble_state == MP_BLUETOOTH_ZEPHYR_BLE_STATE_SUSPENDED) {
return;
}
mp_bluetooth_gap_advertise_stop();
#if MICROPY_PY_BLUETOOTH_ENABLE_CENTRAL_MODE
mp_bluetooth_gap_scan_stop();
bt_le_scan_cb_unregister(&mp_bluetooth_zephyr_gap_scan_cb_struct);
#endif
// There is no way to turn off the BT stack in Zephyr, so just set the
// state as suspended so it can be correctly reactivated later.
mp_bluetooth_zephyr_ble_state = MP_BLUETOOTH_ZEPHYR_BLE_STATE_SUSPENDED;
MP_STATE_PORT(bluetooth_zephyr_root_pointers) = NULL;
}
bool mp_bluetooth_is_active(void) {
return mp_bluetooth_zephyr_ble_state == MP_BLUETOOTH_ZEPHYR_BLE_STATE_ACTIVE;
}
void mp_bluetooth_get_current_address(uint8_t *addr_type, uint8_t *addr) {
if (!mp_bluetooth_is_active()) {
mp_raise_OSError(ERRNO_BLUETOOTH_NOT_ACTIVE);
}
bt_addr_le_t le_addr;
size_t count = 1;
bt_id_get(&le_addr, &count);
if (count == 0) {
mp_raise_OSError(EIO);
}
reverse_addr_byte_order(addr, &le_addr);
*addr_type = le_addr.type;
}
void mp_bluetooth_set_address_mode(uint8_t addr_mode) {
// TODO: implement
}
size_t mp_bluetooth_gap_get_device_name(const uint8_t **buf) {
const char *name = bt_get_name();
*buf = (const uint8_t *)name;
return strlen(name);
}
int mp_bluetooth_gap_set_device_name(const uint8_t *buf, size_t len) {
char tmp_buf[CONFIG_BT_DEVICE_NAME_MAX + 1];
if (len + 1 > sizeof(tmp_buf)) {
return MP_EINVAL;
}
memcpy(tmp_buf, buf, len);
tmp_buf[len] = '\0';
return bt_err_to_errno(bt_set_name(tmp_buf));
}
// Zephyr takes advertising/scan data as an array of (type, len, payload) packets,
// and this function constructs such an array from raw advertising/scan data.
STATIC void mp_bluetooth_prepare_bt_data(const uint8_t *data, size_t len, struct bt_data *bt_data, size_t *bt_len) {
size_t i = 0;
const uint8_t *d = data;
while (d < data + len && i < *bt_len) {
bt_data[i].type = d[1];
bt_data[i].data_len = d[0] - 1;
bt_data[i].data = &d[2];
i += 1;
d += 1 + d[0];
}
*bt_len = i;
}
int mp_bluetooth_gap_advertise_start(bool connectable, int32_t interval_us, const uint8_t *adv_data, size_t adv_data_len, const uint8_t *sr_data, size_t sr_data_len) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
mp_bluetooth_gap_advertise_stop();
struct bt_data bt_ad_data[8];
size_t bt_ad_len = 0;
if (adv_data) {
bt_ad_len = MP_ARRAY_SIZE(bt_ad_data);
mp_bluetooth_prepare_bt_data(adv_data, adv_data_len, bt_ad_data, &bt_ad_len);
}
struct bt_data bt_sd_data[8];
size_t bt_sd_len = 0;
if (sr_data) {
bt_sd_len = MP_ARRAY_SIZE(bt_sd_data);
mp_bluetooth_prepare_bt_data(sr_data, sr_data_len, bt_sd_data, &bt_sd_len);
}
struct bt_le_adv_param param = {
.id = 0,
.sid = 0,
.secondary_max_skip = 0,
.options = (connectable ? BT_LE_ADV_OPT_CONNECTABLE : 0)
| BT_LE_ADV_OPT_ONE_TIME
| BT_LE_ADV_OPT_USE_IDENTITY
| BT_LE_ADV_OPT_SCANNABLE,
.interval_min = interval_us / 625,
.interval_max = interval_us / 625 + 1, // min/max cannot be the same value
.peer = NULL,
};
return bt_err_to_errno(bt_le_adv_start(&param, bt_ad_data, bt_ad_len, bt_sd_data, bt_sd_len));
}
void mp_bluetooth_gap_advertise_stop(void) {
// Note: bt_le_adv_stop returns 0 if adv is already stopped.
int ret = bt_le_adv_stop();
if (ret != 0) {
mp_raise_OSError(bt_err_to_errno(ret));
}
}
int mp_bluetooth_gatts_register_service_begin(bool append) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
if (append) {
// Don't support append yet (modbluetooth.c doesn't support it yet anyway).
return MP_EOPNOTSUPP;
}
// Reset the gatt characteristic value db.
mp_bluetooth_gatts_db_reset(MP_STATE_PORT(bluetooth_zephyr_root_pointers)->gatts_db);
return MP_EOPNOTSUPP;
}
int mp_bluetooth_gatts_register_service_end(void) {
return MP_EOPNOTSUPP;
}
int mp_bluetooth_gatts_register_service(mp_obj_bluetooth_uuid_t *service_uuid, mp_obj_bluetooth_uuid_t **characteristic_uuids, uint16_t *characteristic_flags, mp_obj_bluetooth_uuid_t **descriptor_uuids, uint16_t *descriptor_flags, uint8_t *num_descriptors, uint16_t *handles, size_t num_characteristics) {
return MP_EOPNOTSUPP;
}
int mp_bluetooth_gap_disconnect(uint16_t conn_handle) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return MP_EOPNOTSUPP;
}
int mp_bluetooth_gatts_read(uint16_t value_handle, uint8_t **value, size_t *value_len) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return mp_bluetooth_gatts_db_read(MP_STATE_PORT(bluetooth_zephyr_root_pointers)->gatts_db, value_handle, value, value_len);
}
int mp_bluetooth_gatts_write(uint16_t value_handle, const uint8_t *value, size_t value_len) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return mp_bluetooth_gatts_db_write(MP_STATE_PORT(bluetooth_zephyr_root_pointers)->gatts_db, value_handle, value, value_len);
}
int mp_bluetooth_gatts_notify(uint16_t conn_handle, uint16_t value_handle) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return MP_EOPNOTSUPP;
}
int mp_bluetooth_gatts_notify_send(uint16_t conn_handle, uint16_t value_handle, const uint8_t *value, size_t value_len) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return MP_EOPNOTSUPP;
}
int mp_bluetooth_gatts_indicate(uint16_t conn_handle, uint16_t value_handle) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return MP_EOPNOTSUPP;
}
int mp_bluetooth_gatts_set_buffer(uint16_t value_handle, size_t len, bool append) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return MP_EOPNOTSUPP;
}
int mp_bluetooth_get_preferred_mtu(void) {
if (!mp_bluetooth_is_active()) {
mp_raise_OSError(ERRNO_BLUETOOTH_NOT_ACTIVE);
}
mp_raise_OSError(MP_EOPNOTSUPP);
}
int mp_bluetooth_set_preferred_mtu(uint16_t mtu) {
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return MP_EOPNOTSUPP;
}
#if MICROPY_PY_BLUETOOTH_ENABLE_CENTRAL_MODE
int mp_bluetooth_gap_scan_start(int32_t duration_ms, int32_t interval_us, int32_t window_us, bool active_scan) {
// Stop any ongoing GAP scan.
int ret = mp_bluetooth_gap_scan_stop();
if (ret) {
return ret;
}
struct bt_le_scan_param param = {
.type = active_scan ? BT_HCI_LE_SCAN_ACTIVE : BT_HCI_LE_SCAN_PASSIVE,
.options = BT_LE_SCAN_OPT_NONE,
.interval = MAX(BLE_HCI_SCAN_ITVL_MIN, MIN(BLE_HCI_SCAN_ITVL_MAX, interval_us / 625)),
.window = MAX(BLE_HCI_SCAN_WINDOW_MIN, MIN(BLE_HCI_SCAN_WINDOW_MAX, window_us / 625)),
};
k_timer_start(&mp_bluetooth_zephyr_gap_scan_timer, K_MSEC(duration_ms), K_NO_WAIT);
mp_bluetooth_zephyr_gap_scan_state = MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_ACTIVE;
int err = bt_le_scan_start(&param, NULL);
return bt_err_to_errno(err);
}
int mp_bluetooth_gap_scan_stop(void) {
DEBUG_printf("mp_bluetooth_gap_scan_stop\n");
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
if (mp_bluetooth_zephyr_gap_scan_state == MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_INACTIVE) {
// Already stopped.
return 0;
}
mp_bluetooth_zephyr_gap_scan_state = MP_BLUETOOTH_ZEPHYR_GAP_SCAN_STATE_INACTIVE;
k_timer_stop(&mp_bluetooth_zephyr_gap_scan_timer);
int err = bt_le_scan_stop();
if (err == 0) {
mp_bluetooth_gap_on_scan_complete();
return 0;
}
return bt_err_to_errno(err);
}
int mp_bluetooth_gap_peripheral_connect(uint8_t addr_type, const uint8_t *addr, int32_t duration_ms) {
DEBUG_printf("mp_bluetooth_gap_peripheral_connect\n");
if (!mp_bluetooth_is_active()) {
return ERRNO_BLUETOOTH_NOT_ACTIVE;
}
return MP_EOPNOTSUPP;
}
#endif // MICROPY_PY_BLUETOOTH_ENABLE_CENTRAL_MODE
#endif // MICROPY_PY_BLUETOOTH

View File

@ -1,74 +0,0 @@
/*
* This file is part of the MicroPython project, http://micropython.org/
*
* The MIT License (MIT)
*
* Copyright (c) 2021 Damien P. George
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "py/runtime.h"
#include "py/mphal.h"
static struct k_poll_signal wait_signal;
static struct k_poll_event wait_events[2] = {
K_POLL_EVENT_INITIALIZER(K_POLL_TYPE_SIGNAL,
K_POLL_MODE_NOTIFY_ONLY,
&wait_signal),
K_POLL_EVENT_STATIC_INITIALIZER(K_POLL_TYPE_SEM_AVAILABLE,
K_POLL_MODE_NOTIFY_ONLY,
NULL, 0),
};
void mp_hal_init(void) {
k_poll_signal_init(&wait_signal);
}
void mp_hal_signal_event(void) {
k_poll_signal_raise(&wait_signal, 0);
}
void mp_hal_wait_sem(struct k_sem *sem, uint32_t timeout_ms) {
mp_uint_t t0 = mp_hal_ticks_ms();
if (sem) {
k_poll_event_init(&wait_events[1], K_POLL_TYPE_SEM_AVAILABLE, K_POLL_MODE_NOTIFY_ONLY, sem);
}
for (;;) {
k_timeout_t wait;
if (timeout_ms == (uint32_t)-1) {
wait = K_FOREVER;
} else {
uint32_t dt = mp_hal_ticks_ms() - t0;
if (dt >= timeout_ms) {
return;
}
wait = K_MSEC(timeout_ms - dt);
}
k_poll(wait_events, sem ? 2 : 1, wait);
if (wait_events[0].state == K_POLL_STATE_SIGNALED) {
wait_events[0].signal->signaled = 0;
wait_events[0].state = K_POLL_STATE_NOT_READY;
mp_handle_pending(true);
} else if (sem && wait_events[1].state == K_POLL_STATE_SEM_AVAILABLE) {
wait_events[1].state = K_POLL_STATE_NOT_READY;
return;
}
}
}

View File

@ -1,151 +0,0 @@
# Write characteristic from central to peripheral and time data rate.
from micropython import const
import time, machine, bluetooth
TIMEOUT_MS = 2000
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_MTU_EXCHANGED = const(21)
# How long to run the test for.
_NUM_NOTIFICATIONS = const(40)
_MTU_SIZE = const(131)
_CHAR_SIZE = const(_MTU_SIZE - 3)
SERVICE_UUID = bluetooth.UUID("A5A5A5A5-FFFF-9999-1111-5A5A5A5A5A5A")
CHAR_UUID = bluetooth.UUID("00000000-1111-2222-3333-444444444444")
CHAR = (CHAR_UUID, bluetooth.FLAG_WRITE | bluetooth.FLAG_WRITE_NO_RESPONSE)
SERVICE = (SERVICE_UUID, (CHAR,))
SERVICES = (SERVICE,)
packet_sequence = 0
waiting_events = {}
def irq(event, data):
if event == _IRQ_CENTRAL_CONNECT:
waiting_events[event] = data[0]
elif event == _IRQ_PERIPHERAL_CONNECT:
waiting_events[event] = data[0]
elif event == _IRQ_GATTS_WRITE:
global packet_sequence
conn_handle, attr_handle = data
data = ble.gatts_read(attr_handle)
if not (data[0] == packet_sequence and data[-1] == (256 - packet_sequence) & 0xFF):
print("_IRQ_GATTS_WRITE data invalid:", packet_sequence, data)
elif packet_sequence % 10 == 0:
print("_IRQ_GATTS_WRITE", packet_sequence)
packet_sequence += 1
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# conn_handle, def_handle, value_handle, properties, uuid = data
if data[-1] == CHAR_UUID:
waiting_events[event] = data[2]
else:
return
elif event == _IRQ_MTU_EXCHANGED:
# ATT MTU exchange complete (either initiated by us or the remote device).
conn_handle, mtu = data
print("_IRQ_MTU_EXCHANGED:", mtu)
if event not in waiting_events:
waiting_events[event] = None
def wait_for_event(event, timeout_ms):
t0 = time.ticks_ms()
while time.ticks_diff(time.ticks_ms(), t0) < timeout_ms:
if event in waiting_events:
return waiting_events.pop(event)
machine.idle()
raise ValueError("Timeout waiting for {}".format(event))
# Acting in peripheral role.
def instance0():
multitest.globals(BDADDR=ble.config("mac"))
((char_handle,),) = ble.gatts_register_services(SERVICES)
ble.gatts_set_buffer(char_handle, _CHAR_SIZE)
print("gap_advertise")
ble.gap_advertise(20_000, b"\x02\x01\x06\x04\xffMPY")
multitest.next()
try:
# Wait for central to connect to us.
conn_handle = wait_for_event(_IRQ_CENTRAL_CONNECT, TIMEOUT_MS)
# Wait for central to disconnect us.
wait_for_event(_IRQ_CENTRAL_DISCONNECT, 30000)
print("final packet_sequence:", packet_sequence)
finally:
ble.active(0)
# Acting in central role.
def instance1():
global packet_sequence
((char_handle,),) = ble.gatts_register_services(SERVICES)
multitest.next()
try:
# Connect to peripheral and then disconnect.
print("gap_connect")
ble.config(mtu=_MTU_SIZE)
ble.gap_connect(*BDADDR)
conn_handle = wait_for_event(_IRQ_PERIPHERAL_CONNECT, TIMEOUT_MS)
ble.gattc_exchange_mtu(conn_handle)
# Discover characteristics.
ble.gattc_discover_characteristics(conn_handle, 1, 65535)
value_handle = wait_for_event(_IRQ_GATTC_CHARACTERISTIC_RESULT, TIMEOUT_MS)
wait_for_event(_IRQ_GATTC_CHARACTERISTIC_DONE, TIMEOUT_MS)
# Send data!
data = bytearray(ord("A") + (i % 64) for i in range(_CHAR_SIZE))
for mode in (0, 1):
ticks_start = time.ticks_ms()
for i in range(_NUM_NOTIFICATIONS):
data[0] = packet_sequence
data[-1] = 256 - packet_sequence
if packet_sequence % 10 == 0:
print("gattc_write", packet_sequence)
if mode == 0:
while True:
try:
ble.gattc_write(conn_handle, value_handle, data, mode)
break
except OSError:
pass
else:
ble.gattc_write(conn_handle, value_handle, data, mode)
wait_for_event(_IRQ_GATTC_WRITE_DONE, TIMEOUT_MS)
packet_sequence += 1
ticks_end = time.ticks_ms()
ticks_total = time.ticks_diff(ticks_end, ticks_start)
print(
"Did {} writes in {} ms. {} ms/write, {} bytes/sec".format(
_NUM_NOTIFICATIONS,
ticks_total,
ticks_total / _NUM_NOTIFICATIONS,
_NUM_NOTIFICATIONS * len(data) * 1000 // ticks_total,
)
)
time.sleep_ms(100)
# DIsconnect the peripheral.
print("gap_disconnect:", ble.gap_disconnect(conn_handle))
wait_for_event(_IRQ_PERIPHERAL_DISCONNECT, 20000)
finally:
ble.active(0)
ble = bluetooth.BLE()
ble.active(1)
ble.irq(irq)

View File

@ -1,24 +0,0 @@
--- instance0 ---
gap_advertise
_IRQ_MTU_EXCHANGED: 131
_IRQ_GATTS_WRITE 0
_IRQ_GATTS_WRITE 10
_IRQ_GATTS_WRITE 20
_IRQ_GATTS_WRITE 30
_IRQ_GATTS_WRITE 40
_IRQ_GATTS_WRITE 50
_IRQ_GATTS_WRITE 60
_IRQ_GATTS_WRITE 70
final packet_sequence: 80
--- instance1 ---
gap_connect
_IRQ_MTU_EXCHANGED: 131
gattc_write 0
gattc_write 10
gattc_write 20
gattc_write 30
gattc_write 40
gattc_write 50
gattc_write 60
gattc_write 70
gap_disconnect: True

View File

@ -1,80 +0,0 @@
# Test uasyncio stream readinto() method using TCP server/client
try:
import uasyncio as asyncio
except ImportError:
try:
import asyncio
except ImportError:
print("SKIP")
raise SystemExit
try:
import uarray as array
except ImportError:
try:
import array
except ImportError:
print("SKIP")
raise SystemExit
PORT = 8000
async def handle_connection(reader, writer):
writer.write(b"ab")
await writer.drain()
writer.write(b"c")
await writer.drain()
print("close")
writer.close()
await writer.wait_closed()
print("done")
ev.set()
async def tcp_server():
global ev
ev = asyncio.Event()
server = await asyncio.start_server(handle_connection, "0.0.0.0", PORT)
print("server running")
multitest.next()
async with server:
await asyncio.wait_for(ev.wait(), 10)
async def tcp_client():
reader, writer = await asyncio.open_connection(IP, PORT)
ba = bytearray(2)
n = await reader.readinto(ba)
print(n)
print(ba[:n])
a = array.array("b", [0, 0])
n = await reader.readinto(a)
print(n)
print(a[:n])
try:
n = await reader.readinto(5)
except TypeError as er:
print("TypeError")
try:
n = await reader.readinto()
except TypeError as er:
print("TypeError")
def instance0():
multitest.globals(IP=multitest.get_network_ip())
asyncio.run(tcp_server())
def instance1():
multitest.next()
asyncio.run(tcp_client())

View File

@ -1,11 +0,0 @@
--- instance0 ---
server running
close
done
--- instance1 ---
2
bytearray(b'ab')
1
array('b', [99])
TypeError
TypeError

View File

@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright (c) 2021 Damien P. George
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,71 +0,0 @@
# mpremote -- MicroPython remote control
This CLI tool provides an integrated set of utilities to remotely interact with
and automate a MicroPython device over a serial connection.
The simplest way to use this tool is:
mpremote
This will automatically connect to the device and provide an interactive REPL.
The full list of supported commands are:
mpremote connect <device> -- connect to given device
device may be: list, auto, id:x, port:x
or any valid device name/path
mpremote disconnect -- disconnect current device
mpremote mount <local-dir> -- mount local directory on device
mpremote eval <string> -- evaluate and print the string
mpremote exec <string> -- execute the string
mpremote run <file> -- run the given local script
mpremote fs <command> <args...> -- execute filesystem commands on the device
command may be: cat, ls, cp, rm, mkdir, rmdir
use ":" as a prefix to specify a file on the device
mpremote repl -- enter REPL
options:
--capture <file>
--inject-code <string>
--inject-file <file>
Multiple commands can be specified and they will be run sequentially. Connection
and disconnection will be done automatically at the start and end of the execution
of the tool, if such commands are not explicitly given. Automatic connection will
search for the first available serial device. If no action is specified then the
REPL will be entered.
Shortcuts can be defined using the macro system. Built-in shortcuts are:
- a0, a1, a2, a3: connect to `/dev/ttyACM?`
- u0, u1, u2, u3: connect to `/dev/ttyUSB?`
- c0, c1, c2, c3: connect to `COM?`
- cat, ls, cp, rm, mkdir, rmdir, df: filesystem commands
- reset: reset the device
- bootloader: make the device enter its bootloader
Any user configuration, including user-defined shortcuts, can be placed in
.config/mpremote/config.py. For example:
# Custom macro commands
commands = {
"c33": "connect id:334D335C3138",
"bl": "bootloader",
"double x=4": "eval x*2",
}
Examples:
mpremote
mpremote a1
mpremote connect /dev/ttyUSB0 repl
mpremote ls
mpremote a1 ls
mpremote exec "import micropython; micropython.mem_info()"
mpremote eval 1/2 eval 3/4
mpremote mount .
mpremote mount . exec "import local_script"
mpremote ls
mpremote cat boot.py
mpremote cp :main.py .
mpremote cp main.py :
mpremote cp -r dir/ :

View File

@ -1,6 +0,0 @@
#!/usr/bin/env python3
import sys
from mpremote import main
sys.exit(main.main())

View File

@ -1 +0,0 @@
# empty

View File

@ -1,171 +0,0 @@
import sys, time
try:
import select, termios
except ImportError:
termios = None
select = None
import msvcrt, signal
class ConsolePosix:
def __init__(self):
self.infd = sys.stdin.fileno()
self.infile = sys.stdin.buffer.raw
self.outfile = sys.stdout.buffer.raw
self.orig_attr = termios.tcgetattr(self.infd)
def enter(self):
# attr is: [iflag, oflag, cflag, lflag, ispeed, ospeed, cc]
attr = termios.tcgetattr(self.infd)
attr[0] &= ~(
termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON
)
attr[1] = 0
attr[2] = attr[2] & ~(termios.CSIZE | termios.PARENB) | termios.CS8
attr[3] = 0
attr[6][termios.VMIN] = 1
attr[6][termios.VTIME] = 0
termios.tcsetattr(self.infd, termios.TCSANOW, attr)
def exit(self):
termios.tcsetattr(self.infd, termios.TCSANOW, self.orig_attr)
def waitchar(self, pyb_serial):
# TODO pyb_serial might not have fd
select.select([self.infd, pyb_serial.fd], [], [])
def readchar(self):
res = select.select([self.infd], [], [], 0)
if res[0]:
return self.infile.read(1)
else:
return None
def write(self, buf):
self.outfile.write(buf)
class ConsoleWindows:
KEY_MAP = {
b"H": b"A", # UP
b"P": b"B", # DOWN
b"M": b"C", # RIGHT
b"K": b"D", # LEFT
b"G": b"H", # POS1
b"O": b"F", # END
b"Q": b"6~", # PGDN
b"I": b"5~", # PGUP
b"s": b"1;5D", # CTRL-LEFT,
b"t": b"1;5C", # CTRL-RIGHT,
b"\x8d": b"1;5A", # CTRL-UP,
b"\x91": b"1;5B", # CTRL-DOWN,
b"w": b"1;5H", # CTRL-POS1
b"u": b"1;5F", # CTRL-END
b"\x98": b"1;3A", # ALT-UP,
b"\xa0": b"1;3B", # ALT-DOWN,
b"\x9d": b"1;3C", # ALT-RIGHT,
b"\x9b": b"1;3D", # ALT-LEFT,
b"\x97": b"1;3H", # ALT-POS1,
b"\x9f": b"1;3F", # ALT-END,
b"S": b"3~", # DEL,
b"\x93": b"3;5~", # CTRL-DEL
b"R": b"2~", # INS
b"\x92": b"2;5~", # CTRL-INS
b"\x94": b"Z", # Ctrl-Tab = BACKTAB,
}
def __init__(self):
self.ctrl_c = 0
def _sigint_handler(self, signo, frame):
self.ctrl_c += 1
def enter(self):
signal.signal(signal.SIGINT, self._sigint_handler)
def exit(self):
signal.signal(signal.SIGINT, signal.SIG_DFL)
def inWaiting(self):
return 1 if self.ctrl_c or msvcrt.kbhit() else 0
def waitchar(self, pyb_serial):
while not (self.inWaiting() or pyb_serial.inWaiting()):
time.sleep(0.01)
def readchar(self):
if self.ctrl_c:
self.ctrl_c -= 1
return b"\x03"
if msvcrt.kbhit():
ch = msvcrt.getch()
while ch in b"\x00\xe0": # arrow or function key prefix?
if not msvcrt.kbhit():
return None
ch = msvcrt.getch() # second call returns the actual key code
try:
ch = b"\x1b[" + self.KEY_MAP[ch]
except KeyError:
return None
return ch
def write(self, buf):
buf = buf.decode() if isinstance(buf, bytes) else buf
sys.stdout.write(buf)
sys.stdout.flush()
# for b in buf:
# if isinstance(b, bytes):
# msvcrt.putch(b)
# else:
# msvcrt.putwch(b)
if termios:
Console = ConsolePosix
VT_ENABLED = True
else:
Console = ConsoleWindows
# Windows VT mode ( >= win10 only)
# https://bugs.python.org/msg291732
import ctypes, os
from ctypes import wintypes
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
ERROR_INVALID_PARAMETER = 0x0057
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
def _check_bool(result, func, args):
if not result:
raise ctypes.WinError(ctypes.get_last_error())
return args
LPDWORD = ctypes.POINTER(wintypes.DWORD)
kernel32.GetConsoleMode.errcheck = _check_bool
kernel32.GetConsoleMode.argtypes = (wintypes.HANDLE, LPDWORD)
kernel32.SetConsoleMode.errcheck = _check_bool
kernel32.SetConsoleMode.argtypes = (wintypes.HANDLE, wintypes.DWORD)
def set_conout_mode(new_mode, mask=0xFFFFFFFF):
# don't assume StandardOutput is a console.
# open CONOUT$ instead
fdout = os.open("CONOUT$", os.O_RDWR)
try:
hout = msvcrt.get_osfhandle(fdout)
old_mode = wintypes.DWORD()
kernel32.GetConsoleMode(hout, ctypes.byref(old_mode))
mode = (new_mode & mask) | (old_mode.value & ~mask)
kernel32.SetConsoleMode(hout, mode)
return old_mode.value
finally:
os.close(fdout)
# def enable_vt_mode():
mode = mask = ENABLE_VIRTUAL_TERMINAL_PROCESSING
try:
set_conout_mode(mode, mask)
VT_ENABLED = True
except WindowsError as e:
VT_ENABLED = False

View File

@ -1,455 +0,0 @@
"""
MicroPython Remote - Interaction and automation tool for MicroPython
MIT license; Copyright (c) 2019-2021 Damien P. George
This program provides a set of utilities to interact with and automate a
MicroPython device over a serial connection. Commands supported are:
mpremote -- auto-detect, connect and enter REPL
mpremote <device-shortcut> -- connect to given device
mpremote connect <device> -- connect to given device
mpremote disconnect -- disconnect current device
mpremote mount <local-dir> -- mount local directory on device
mpremote eval <string> -- evaluate and print the string
mpremote exec <string> -- execute the string
mpremote run <script> -- run the given local script
mpremote fs <command> <args...> -- execute filesystem commands on the device
mpremote repl -- enter REPL
"""
import os, sys
import serial.tools.list_ports
from . import pyboardextended as pyboard
from .console import Console, ConsolePosix
_PROG = "mpremote"
_BUILTIN_COMMAND_EXPANSIONS = {
# Device connection shortcuts.
"a0": "connect /dev/ttyACM0",
"a1": "connect /dev/ttyACM1",
"a2": "connect /dev/ttyACM2",
"a3": "connect /dev/ttyACM3",
"u0": "connect /dev/ttyUSB0",
"u1": "connect /dev/ttyUSB1",
"u2": "connect /dev/ttyUSB2",
"u3": "connect /dev/ttyUSB3",
"c0": "connect COM0",
"c1": "connect COM1",
"c2": "connect COM2",
"c3": "connect COM3",
# Filesystem shortcuts.
"cat": "fs cat",
"ls": "fs ls",
"cp": "fs cp",
"rm": "fs rm",
"mkdir": "fs mkdir",
"rmdir": "fs rmdir",
"df": [
"exec",
"import uos\nprint('mount \\tsize \\tused \\tavail \\tuse%')\nfor _m in [''] + uos.listdir('/'):\n _s = uos.stat('/' + _m)\n if not _s[0] & 1 << 14: continue\n _s = uos.statvfs(_m)\n if _s[0]:\n _size = _s[0] * _s[2]; _free = _s[0] * _s[3]; print(_m, _size, _size - _free, _free, int(100 * (_size - _free) / _size), sep='\\t')",
],
# Other shortcuts.
"reset t_ms=100": [
"exec",
"--no-follow",
"import utime, umachine; utime.sleep_ms(t_ms); umachine.reset()",
],
"bootloader t_ms=100": [
"exec",
"--no-follow",
"import utime, umachine; utime.sleep_ms(t_ms); umachine.bootloader()",
],
"setrtc": [
"exec",
"import machine; machine.RTC().datetime((2020, 1, 1, 0, 10, 0, 0, 0))",
],
}
def load_user_config():
# Create empty config object.
config = __build_class__(lambda: None, "Config")()
config.commands = {}
# Get config file name.
path = os.getenv("XDG_CONFIG_HOME")
if path is None:
path = os.getenv("HOME")
if path is None:
return config
path = os.path.join(path, ".config")
path = os.path.join(path, _PROG)
config_file = os.path.join(path, "config.py")
# Check if config file exists.
if not os.path.exists(config_file):
return config
# Exec the config file in its directory.
with open(config_file) as f:
config_data = f.read()
prev_cwd = os.getcwd()
os.chdir(path)
exec(config_data, config.__dict__)
os.chdir(prev_cwd)
return config
def prepare_command_expansions(config):
global _command_expansions
_command_expansions = {}
for command_set in (_BUILTIN_COMMAND_EXPANSIONS, config.commands):
for cmd, sub in command_set.items():
cmd = cmd.split()
if len(cmd) == 1:
args = ()
else:
args = tuple(c.split("=") for c in cmd[1:])
if isinstance(sub, str):
sub = sub.split()
_command_expansions[cmd[0]] = (args, sub)
def do_command_expansion(args):
def usage_error(cmd, exp_args, msg):
print(f"Command {cmd} {msg}; signature is:")
print(" ", cmd, " ".join("=".join(a) for a in exp_args))
sys.exit(1)
last_arg_idx = len(args)
pre = []
while args and args[0] in _command_expansions:
cmd = args.pop(0)
exp_args, exp_sub = _command_expansions[cmd]
for exp_arg in exp_args:
exp_arg_name = exp_arg[0]
if args and "=" not in args[0]:
# Argument given without a name.
value = args.pop(0)
elif args and args[0].startswith(exp_arg_name + "="):
# Argument given with correct name.
value = args.pop(0).split("=", 1)[1]
else:
# No argument given, or argument given with a different name.
if len(exp_arg) == 1:
# Required argument (it has no default).
usage_error(cmd, exp_args, f"missing argument {exp_arg_name}")
else:
# Optional argument with a default.
value = exp_arg[1]
pre.append(f"{exp_arg_name}={value}")
args[0:0] = exp_sub
last_arg_idx = len(exp_sub)
if last_arg_idx < len(args) and "=" in args[last_arg_idx]:
# Extra unknown arguments given.
arg = args[last_arg_idx].split("=", 1)[0]
usage_error(cmd, exp_args, f"given unexpected argument {arg}")
sys.exit(1)
# Insert expansion with optional setting of arguments.
if pre:
args[0:0] = ["exec", ";".join(pre)]
def do_connect(args):
dev = args.pop(0)
try:
if dev == "list":
# List attached devices.
for p in sorted(serial.tools.list_ports.comports()):
print(
"{} {} {:04x}:{:04x} {} {}".format(
p.device, p.serial_number, p.pid, p.vid, p.manufacturer, p.product
)
)
return None
elif dev == "auto":
# Auto-detect and auto-connect to the first available device.
for p in sorted(serial.tools.list_ports.comports()):
try:
return pyboard.PyboardExtended(p.device, baudrate=115200)
except pyboard.PyboardError as er:
if not er.args[0].startswith("failed to access"):
raise er
raise pyboard.PyboardError("no device found")
elif dev.startswith("id:"):
# Search for a device with the given serial number.
serial_number = dev[len("id:") :]
dev = None
for p in serial.tools.list_ports.comports():
if p.serial_number == serial_number:
return pyboard.PyboardExtended(p.device, baudrate=115200)
raise pyboard.PyboardError("no device with serial number {}".format(serial_number))
else:
# Connect to the given device.
if dev.startswith("port:"):
dev = dev[len("port:") :]
return pyboard.PyboardExtended(dev, baudrate=115200)
except pyboard.PyboardError as er:
msg = er.args[0]
if msg.startswith("failed to access"):
msg += " (it may be in use by another program)"
print(msg)
sys.exit(1)
def do_disconnect(pyb):
try:
if pyb.mounted:
if not pyb.in_raw_repl:
pyb.enter_raw_repl(soft_reset=False)
pyb.umount_local()
if pyb.in_raw_repl:
pyb.exit_raw_repl()
except OSError:
# Ignore any OSError exceptions when shutting down, eg:
# - pyboard.filesystem_command will close the connecton if it had an error
# - umounting will fail if serial port disappeared
pass
pyb.close()
def do_filesystem(pyb, args):
def _list_recursive(files, path):
if os.path.isdir(path):
for entry in os.listdir(path):
_list_recursive(files, os.path.join(path, entry))
else:
files.append(os.path.split(path))
if args[0] == "cp" and args[1] == "-r":
args.pop(0)
args.pop(0)
assert args[-1] == ":"
args.pop()
src_files = []
for path in args:
_list_recursive(src_files, path)
known_dirs = {""}
pyb.exec_("import uos")
for dir, file in src_files:
dir_parts = dir.split("/")
for i in range(len(dir_parts)):
d = "/".join(dir_parts[: i + 1])
if d not in known_dirs:
pyb.exec_("try:\n uos.mkdir('%s')\nexcept OSError as e:\n print(e)" % d)
known_dirs.add(d)
pyboard.filesystem_command(pyb, ["cp", os.path.join(dir, file), ":" + dir + "/"])
else:
pyboard.filesystem_command(pyb, args)
args.clear()
def do_repl_main_loop(pyb, console_in, console_out_write, *, code_to_inject, file_to_inject):
while True:
console_in.waitchar(pyb.serial)
c = console_in.readchar()
if c:
if c == b"\x1d": # ctrl-], quit
break
elif c == b"\x04": # ctrl-D
# do a soft reset and reload the filesystem hook
pyb.soft_reset_with_mount(console_out_write)
elif c == b"\x0a" and code_to_inject is not None: # ctrl-j, inject code
pyb.serial.write(code_to_inject)
elif c == b"\x0b" and file_to_inject is not None: # ctrl-k, inject script
console_out_write(bytes("Injecting %s\r\n" % file_to_inject, "utf8"))
pyb.enter_raw_repl(soft_reset=False)
with open(file_to_inject, "rb") as f:
pyfile = f.read()
try:
pyb.exec_raw_no_follow(pyfile)
except pyboard.PyboardError as er:
console_out_write(b"Error:\r\n")
console_out_write(er)
pyb.exit_raw_repl()
else:
pyb.serial.write(c)
try:
n = pyb.serial.inWaiting()
except OSError as er:
if er.args[0] == 5: # IO error, device disappeared
print("device disconnected")
break
if n > 0:
c = pyb.serial.read(1)
if c is not None:
# pass character through to the console
oc = ord(c)
if oc in (8, 9, 10, 13, 27) or 32 <= oc <= 126:
console_out_write(c)
else:
console_out_write(b"[%02x]" % ord(c))
def do_repl(pyb, args):
capture_file = None
code_to_inject = None
file_to_inject = None
while len(args):
if args[0] == "--capture":
args.pop(0)
capture_file = args.pop(0)
elif args[0] == "--inject-code":
args.pop(0)
code_to_inject = bytes(args.pop(0).replace("\\n", "\r\n"), "utf8")
elif args[0] == "--inject-file":
args.pop(0)
file_to_inject = args.pop(0)
else:
break
print("Connected to MicroPython at %s" % pyb.device_name)
print("Use Ctrl-] to exit this shell")
if capture_file is not None:
print('Capturing session to file "%s"' % capture_file)
capture_file = open(capture_file, "wb")
if code_to_inject is not None:
print("Use Ctrl-J to inject", code_to_inject)
if file_to_inject is not None:
print('Use Ctrl-K to inject file "%s"' % file_to_inject)
console = Console()
console.enter()
def console_out_write(b):
console.write(b)
if capture_file is not None:
capture_file.write(b)
capture_file.flush()
try:
do_repl_main_loop(
pyb,
console,
console_out_write,
code_to_inject=code_to_inject,
file_to_inject=file_to_inject,
)
finally:
console.exit()
if capture_file is not None:
capture_file.close()
def execbuffer(pyb, buf, follow):
ret_val = 0
try:
pyb.exec_raw_no_follow(buf)
if follow:
ret, ret_err = pyb.follow(timeout=None, data_consumer=pyboard.stdout_write_bytes)
if ret_err:
pyboard.stdout_write_bytes(ret_err)
ret_val = 1
except pyboard.PyboardError as er:
print(er)
ret_val = 1
except KeyboardInterrupt:
ret_val = 1
return ret_val
def main():
config = load_user_config()
prepare_command_expansions(config)
args = sys.argv[1:]
pyb = None
did_action = False
try:
while args:
do_command_expansion(args)
cmds = {
"connect": (False, False, 1),
"disconnect": (False, False, 0),
"mount": (True, False, 1),
"repl": (False, True, 0),
"eval": (True, True, 1),
"exec": (True, True, 1),
"run": (True, True, 1),
"fs": (True, True, 1),
}
cmd = args.pop(0)
try:
need_raw_repl, is_action, num_args_min = cmds[cmd]
except KeyError:
print(f"{_PROG}: '{cmd}' is not a command")
return 1
if len(args) < num_args_min:
print(f"{_PROG}: '{cmd}' neads at least {num_args_min} argument(s)")
return 1
if cmd == "connect":
if pyb is not None:
do_disconnect(pyb)
pyb = do_connect(args)
if pyb is None:
did_action = True
continue
if pyb is None:
pyb = do_connect(["auto"])
if need_raw_repl:
if not pyb.in_raw_repl:
pyb.enter_raw_repl()
else:
if pyb.in_raw_repl:
pyb.exit_raw_repl()
if is_action:
did_action = True
if cmd == "disconnect":
do_disconnect(pyb)
pyb = None
elif cmd == "mount":
path = args.pop(0)
pyb.mount_local(path)
print(f"Local directory {path} is mounted at /remote")
elif cmd in ("exec", "eval", "run"):
follow = True
if args[0] == "--no-follow":
args.pop(0)
follow = False
if cmd == "exec":
buf = args.pop(0)
elif cmd == "eval":
buf = "print(" + args.pop(0) + ")"
else:
filename = args.pop(0)
try:
with open(filename, "rb") as f:
buf = f.read()
except OSError:
print(f"{_PROG}: could not read file '{filename}'")
return 1
ret = execbuffer(pyb, buf, follow)
if ret:
return ret
elif cmd == "fs":
do_filesystem(pyb, args)
elif cmd == "repl":
do_repl(pyb, args)
if not did_action:
if pyb is None:
pyb = do_connect(["auto"])
if pyb.in_raw_repl:
pyb.exit_raw_repl()
do_repl(pyb, args)
finally:
if pyb is not None:
do_disconnect(pyb)

View File

@ -1,621 +0,0 @@
import os, re, serial, struct, time
from errno import EPERM
from .console import VT_ENABLED
try:
from .pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
except ImportError:
import sys
sys.path.append(os.path.dirname(__file__) + "/../..")
from pyboard import Pyboard, PyboardError, stdout_write_bytes, filesystem_command
fs_hook_cmds = {
"CMD_STAT": 1,
"CMD_ILISTDIR_START": 2,
"CMD_ILISTDIR_NEXT": 3,
"CMD_OPEN": 4,
"CMD_CLOSE": 5,
"CMD_READ": 6,
"CMD_WRITE": 7,
"CMD_SEEK": 8,
"CMD_REMOVE": 9,
"CMD_RENAME": 10,
}
fs_hook_code = """\
import uos, uio, ustruct, micropython, usys
class RemoteCommand:
def __init__(self, use_second_port):
self.buf4 = bytearray(4)
try:
import pyb
self.fout = pyb.USB_VCP()
if use_second_port:
self.fin = pyb.USB_VCP(1)
else:
self.fin = pyb.USB_VCP()
except:
import usys
self.fout = usys.stdout.buffer
self.fin = usys.stdin.buffer
import select
self.poller = select.poll()
self.poller.register(self.fin, select.POLLIN)
def poll_in(self):
for _ in self.poller.ipoll(1000):
return
self.end()
raise Exception('timeout waiting for remote')
def rd(self, n):
buf = bytearray(n)
self.rd_into(buf, n)
return buf
def rd_into(self, buf, n):
# implement reading with a timeout in case other side disappears
if n == 0:
return
self.poll_in()
r = self.fin.readinto(buf, n)
if r < n:
mv = memoryview(buf)
while r < n:
self.poll_in()
r += self.fin.readinto(mv[r:], n - r)
def begin(self, type):
micropython.kbd_intr(-1)
buf4 = self.buf4
buf4[0] = 0x18
buf4[1] = type
self.fout.write(buf4, 2)
# Wait for sync byte 0x18, but don't get stuck forever
for i in range(30):
self.poller.poll(1000)
self.fin.readinto(buf4, 1)
if buf4[0] == 0x18:
break
def end(self):
micropython.kbd_intr(3)
def rd_s8(self):
self.rd_into(self.buf4, 1)
n = self.buf4[0]
if n & 0x80:
n -= 0x100
return n
def rd_s32(self):
buf4 = self.buf4
self.rd_into(buf4, 4)
n = buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24
if buf4[3] & 0x80:
n -= 0x100000000
return n
def rd_u32(self):
buf4 = self.buf4
self.rd_into(buf4, 4)
return buf4[0] | buf4[1] << 8 | buf4[2] << 16 | buf4[3] << 24
def rd_bytes(self, buf):
# TODO if n is large (eg >256) then we may miss bytes on stdin
n = self.rd_s32()
if buf is None:
ret = buf = bytearray(n)
else:
ret = n
self.rd_into(buf, n)
return ret
def rd_str(self):
n = self.rd_s32()
if n == 0:
return ''
else:
return str(self.rd(n), 'utf8')
def wr_s8(self, i):
self.buf4[0] = i
self.fout.write(self.buf4, 1)
def wr_s32(self, i):
ustruct.pack_into('<i', self.buf4, 0, i)
self.fout.write(self.buf4)
def wr_bytes(self, b):
self.wr_s32(len(b))
self.fout.write(b)
# str and bytes act the same in MicroPython
wr_str = wr_bytes
class RemoteFile(uio.IOBase):
def __init__(self, cmd, fd, is_text):
self.cmd = cmd
self.fd = fd
self.is_text = is_text
def __enter__(self):
return self
def __exit__(self, a, b, c):
self.close()
def ioctl(self, request, arg):
if request == 4: # CLOSE
self.close()
return 0
def flush(self):
pass
def close(self):
if self.fd is None:
return
c = self.cmd
c.begin(CMD_CLOSE)
c.wr_s8(self.fd)
c.end()
self.fd = None
def read(self, n=-1):
c = self.cmd
c.begin(CMD_READ)
c.wr_s8(self.fd)
c.wr_s32(n)
data = c.rd_bytes(None)
c.end()
if self.is_text:
data = str(data, 'utf8')
else:
data = bytes(data)
return data
def readinto(self, buf):
c = self.cmd
c.begin(CMD_READ)
c.wr_s8(self.fd)
c.wr_s32(len(buf))
n = c.rd_bytes(buf)
c.end()
return n
def readline(self):
l = ''
while 1:
c = self.read(1)
l += c
if c == '\\n' or c == '':
return l
def readlines(self):
ls = []
while 1:
l = self.readline()
if not l:
return ls
ls.append(l)
def write(self, buf):
c = self.cmd
c.begin(CMD_WRITE)
c.wr_s8(self.fd)
c.wr_bytes(buf)
n = c.rd_s32()
c.end()
return n
def seek(self, n):
c = self.cmd
c.begin(CMD_SEEK)
c.wr_s8(self.fd)
c.wr_s32(n)
n = c.rd_s32()
c.end()
return n
class RemoteFS:
def __init__(self, cmd):
self.cmd = cmd
def mount(self, readonly, mkfs):
pass
def umount(self):
pass
def chdir(self, path):
if not path.startswith("/"):
path = self.path + path
if not path.endswith("/"):
path += "/"
if path != "/":
self.stat(path)
self.path = path
def getcwd(self):
return self.path
def remove(self, path):
c = self.cmd
c.begin(CMD_REMOVE)
c.wr_str(self.path + path)
res = c.rd_s32()
c.end()
if res < 0:
raise OSError(-res)
def rename(self, old, new):
c = self.cmd
c.begin(CMD_RENAME)
c.wr_str(self.path + old)
c.wr_str(self.path + new)
res = c.rd_s32()
c.end()
if res < 0:
raise OSError(-res)
def stat(self, path):
c = self.cmd
c.begin(CMD_STAT)
c.wr_str(self.path + path)
res = c.rd_s8()
if res < 0:
c.end()
raise OSError(-res)
mode = c.rd_u32()
size = c.rd_u32()
atime = c.rd_u32()
mtime = c.rd_u32()
ctime = c.rd_u32()
c.end()
return mode, 0, 0, 0, 0, 0, size, atime, mtime, ctime
def ilistdir(self, path):
c = self.cmd
c.begin(CMD_ILISTDIR_START)
c.wr_str(self.path + path)
res = c.rd_s8()
c.end()
if res < 0:
raise OSError(-res)
def next():
while True:
c.begin(CMD_ILISTDIR_NEXT)
name = c.rd_str()
if name:
type = c.rd_u32()
c.end()
yield (name, type, 0)
else:
c.end()
break
return next()
def open(self, path, mode):
c = self.cmd
c.begin(CMD_OPEN)
c.wr_str(self.path + path)
c.wr_str(mode)
fd = c.rd_s8()
c.end()
if fd < 0:
raise OSError(-fd)
return RemoteFile(c, fd, mode.find('b') == -1)
def __mount(use_second_port):
uos.mount(RemoteFS(RemoteCommand(use_second_port)), '/remote')
uos.chdir('/remote')
"""
# Apply basic compression on hook code.
for key, value in fs_hook_cmds.items():
fs_hook_code = re.sub(key, str(value), fs_hook_code)
fs_hook_code = re.sub(" *#.*$", "", fs_hook_code, flags=re.MULTILINE)
fs_hook_code = re.sub("\n\n+", "\n", fs_hook_code)
fs_hook_code = re.sub(" ", " ", fs_hook_code)
fs_hook_code = re.sub("rd_", "r", fs_hook_code)
fs_hook_code = re.sub("wr_", "w", fs_hook_code)
fs_hook_code = re.sub("buf4", "b4", fs_hook_code)
class PyboardCommand:
def __init__(self, fin, fout, path):
self.fin = fin
self.fout = fout
self.root = path + "/"
self.data_ilistdir = ["", []]
self.data_files = []
def rd_s8(self):
return struct.unpack("<b", self.fin.read(1))[0]
def rd_s32(self):
return struct.unpack("<i", self.fin.read(4))[0]
def rd_bytes(self):
n = self.rd_s32()
return self.fin.read(n)
def rd_str(self):
n = self.rd_s32()
if n == 0:
return ""
else:
return str(self.fin.read(n), "utf8")
def wr_s8(self, i):
self.fout.write(struct.pack("<b", i))
def wr_s32(self, i):
self.fout.write(struct.pack("<i", i))
def wr_u32(self, i):
self.fout.write(struct.pack("<I", i))
def wr_bytes(self, b):
self.wr_s32(len(b))
self.fout.write(b)
def wr_str(self, s):
b = bytes(s, "utf8")
self.wr_s32(len(b))
self.fout.write(b)
def log_cmd(self, msg):
print(f"[{msg}]", end="\r\n")
def path_check(self, path):
parent = os.path.realpath(self.root)
child = os.path.realpath(path)
if parent != os.path.commonpath([parent, child]):
raise OSError(EPERM, "") # File is outside mounted dir
def do_stat(self):
path = self.root + self.rd_str()
# self.log_cmd(f"stat {path}")
try:
self.path_check(path)
stat = os.stat(path)
except OSError as er:
self.wr_s8(-abs(er.errno))
else:
self.wr_s8(0)
# Note: st_ino would need to be 64-bit if added here
self.wr_u32(stat.st_mode)
self.wr_u32(stat.st_size)
self.wr_u32(int(stat.st_atime))
self.wr_u32(int(stat.st_mtime))
self.wr_u32(int(stat.st_ctime))
def do_ilistdir_start(self):
path = self.root + self.rd_str()
try:
self.path_check(path)
self.wr_s8(0)
except OSError as er:
self.wr_s8(-abs(er.errno))
else:
self.data_ilistdir[0] = path
self.data_ilistdir[1] = os.listdir(path)
def do_ilistdir_next(self):
if self.data_ilistdir[1]:
entry = self.data_ilistdir[1].pop(0)
try:
stat = os.lstat(self.data_ilistdir[0] + "/" + entry)
mode = stat.st_mode & 0xC000
except OSError as er:
mode = 0
self.wr_str(entry)
self.wr_u32(mode)
else:
self.wr_str("")
def do_open(self):
path = self.root + self.rd_str()
mode = self.rd_str()
# self.log_cmd(f"open {path} {mode}")
try:
self.path_check(path)
f = open(path, mode)
except OSError as er:
self.wr_s8(-abs(er.errno))
else:
is_text = mode.find("b") == -1
try:
fd = self.data_files.index(None)
self.data_files[fd] = (f, is_text)
except ValueError:
fd = len(self.data_files)
self.data_files.append((f, is_text))
self.wr_s8(fd)
def do_close(self):
fd = self.rd_s8()
# self.log_cmd(f"close {fd}")
self.data_files[fd][0].close()
self.data_files[fd] = None
def do_read(self):
fd = self.rd_s8()
n = self.rd_s32()
buf = self.data_files[fd][0].read(n)
if self.data_files[fd][1]:
buf = bytes(buf, "utf8")
self.wr_bytes(buf)
# self.log_cmd(f"read {fd} {n} -> {len(buf)}")
def do_seek(self):
fd = self.rd_s8()
n = self.rd_s32()
# self.log_cmd(f"seek {fd} {n}")
self.data_files[fd][0].seek(n)
self.wr_s32(n)
def do_write(self):
fd = self.rd_s8()
buf = self.rd_bytes()
if self.data_files[fd][1]:
buf = str(buf, "utf8")
n = self.data_files[fd][0].write(buf)
self.wr_s32(n)
# self.log_cmd(f"write {fd} {len(buf)} -> {n}")
def do_remove(self):
path = self.root + self.rd_str()
# self.log_cmd(f"remove {path}")
try:
self.path_check(path)
os.remove(path)
ret = 0
except OSError as er:
ret = -abs(er.errno)
self.wr_s32(ret)
def do_rename(self):
old = self.root + self.rd_str()
new = self.root + self.rd_str()
# self.log_cmd(f"rename {old} {new}")
try:
self.path_check(old)
self.path_check(new)
os.rename(old, new)
ret = 0
except OSError as er:
ret = -abs(er.errno)
self.wr_s32(ret)
cmd_table = {
fs_hook_cmds["CMD_STAT"]: do_stat,
fs_hook_cmds["CMD_ILISTDIR_START"]: do_ilistdir_start,
fs_hook_cmds["CMD_ILISTDIR_NEXT"]: do_ilistdir_next,
fs_hook_cmds["CMD_OPEN"]: do_open,
fs_hook_cmds["CMD_CLOSE"]: do_close,
fs_hook_cmds["CMD_READ"]: do_read,
fs_hook_cmds["CMD_WRITE"]: do_write,
fs_hook_cmds["CMD_SEEK"]: do_seek,
fs_hook_cmds["CMD_REMOVE"]: do_remove,
fs_hook_cmds["CMD_RENAME"]: do_rename,
}
class SerialIntercept:
def __init__(self, serial, cmd):
self.orig_serial = serial
self.cmd = cmd
self.buf = b""
self.orig_serial.timeout = 5.0
def _check_input(self, blocking):
if blocking or self.orig_serial.inWaiting() > 0:
c = self.orig_serial.read(1)
if c == b"\x18":
# a special command
c = self.orig_serial.read(1)[0]
self.orig_serial.write(b"\x18") # Acknowledge command
PyboardCommand.cmd_table[c](self.cmd)
elif not VT_ENABLED and c == b"\x1b":
# ESC code, ignore these on windows
esctype = self.orig_serial.read(1)
if esctype == b"[": # CSI
while not (0x40 < self.orig_serial.read(1)[0] < 0x7E):
# Looking for "final byte" of escape sequence
pass
else:
self.buf += c
@property
def fd(self):
return self.orig_serial.fd
def close(self):
self.orig_serial.close()
def inWaiting(self):
self._check_input(False)
return len(self.buf)
def read(self, n):
while len(self.buf) < n:
self._check_input(True)
out = self.buf[:n]
self.buf = self.buf[n:]
return out
def write(self, buf):
self.orig_serial.write(buf)
class PyboardExtended(Pyboard):
def __init__(self, dev, *args, **kwargs):
super().__init__(dev, *args, **kwargs)
self.device_name = dev
self.mounted = False
def mount_local(self, path, dev_out=None):
fout = self.serial
if dev_out is not None:
try:
fout = serial.Serial(dev_out)
except serial.SerialException:
port = list(serial.tools.list_ports.grep(dev_out))
if not port:
raise
for p in port:
try:
fout = serial.Serial(p.device)
break
except serial.SerialException:
pass
self.mounted = True
if self.eval('"RemoteFS" in globals()') == b"False":
self.exec_(fs_hook_code)
self.exec_("__mount(%s)" % (dev_out is not None))
self.cmd = PyboardCommand(self.serial, fout, path)
self.serial = SerialIntercept(self.serial, self.cmd)
self.dev_out = dev_out
def soft_reset_with_mount(self, out_callback):
self.serial.write(b"\x04")
if not self.mounted:
return
# Wait for a response to the soft-reset command.
for i in range(10):
if self.serial.inWaiting():
break
time.sleep(0.05)
else:
# Device didn't respond so it wasn't in a state to do a soft reset.
return
out_callback(self.serial.read(1))
self.serial = self.serial.orig_serial
n = self.serial.inWaiting()
while n > 0:
buf = self.serial.read(n)
out_callback(buf)
time.sleep(0.1)
n = self.serial.inWaiting()
self.serial.write(b"\x01")
self.exec_(fs_hook_code)
self.exec_("__mount(%s)" % (self.dev_out is not None))
self.exit_raw_repl()
self.read_until(4, b">>> ")
self.serial = SerialIntercept(self.serial, self.cmd)
def umount_local(self):
if self.mounted:
self.exec_('uos.umount("/remote")')
self.mounted = False

View File

@ -1,6 +0,0 @@
[build-system]
requires = [
"setuptools>=42",
"wheel"
]
build-backend = "setuptools.build_meta"

View File

@ -1,25 +0,0 @@
[metadata]
name = mpremote
version = 0.0.5
author = Damien George
author_email = damien@micropython.org
description = Tool for interacting remotely with MicroPython
long_description = file: README.md
long_description_content_type = text/markdown
url = https://github.com/micropython/micropython
project_urls =
Bug Tracker = https://github.com/micropython/micropython/issues
classifiers =
Programming Language :: Python :: 3
License :: OSI Approved :: MIT License
Operating System :: OS Independent
[options]
packages = mpremote
python_requires = >= 3.4
install_requires =
pyserial >= 3.3
[options.entry_points]
console_scripts =
mpremote = mpremote.main:main