Add MixGo CE board

This commit is contained in:
dahanzimin 2022-05-10 18:56:13 +08:00
parent cbefa96bad
commit d48f961d0a
27 changed files with 4092 additions and 0 deletions

View File

@ -0,0 +1,48 @@
/*
* This file is part of the MicroPython project, http://micropython.org/
*
* The MIT License (MIT)
*
* Copyright (c) 2020 Scott Shawcroft for Adafruit Industries
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "supervisor/board.h"
#include "mpconfigboard.h"
#include "shared-bindings/microcontroller/Pin.h"
void board_init(void) {
// Debug UART
#ifdef DEBUG
common_hal_never_reset_pin(&pin_GPIO43);
common_hal_never_reset_pin(&pin_GPIO44);
#endif /* DEBUG */
}
bool board_requests_safe_mode(void) {
return false;
}
void reset_board(void) {
}
void board_deinit(void) {
}

View File

@ -0,0 +1,58 @@
"""
Image
CircuitPython library for Image - MixGoCE
=======================================================
Small Cabbage
20210721
"""
HEART=bytearray(b'\x00\x00\x00\x0c\x1e\x3f\x7e\xfc\x7e\x3f\x1e\x0c\x00\x00\x00\x00')
HEART_SMALL=bytearray(b'\x00\x00\x00\x00\x0c\x1e\x3c\x78\x3c\x1e\x0c\x00\x00\x00\x00\x00')
HAPPY=bytearray(b'\x00\x00\x00\x0c\x0c\x20\x40\x80\x80\x40\x20\x0c\x0c\x00\x00\x00')
SAD=bytearray(b'\x00\x00\x08\x04\x04\x84\x40\x20\x20\x40\x84\x04\x04\x08\x00\x00')
SMILE=bytearray(b'\x00\x00\x04\x02\x02\x24\x40\x80\x80\x40\x24\x02\x02\x04\x00\x00')
SILLY=bytearray(b'\x00\x00\x04\x0a\x0a\x04\x60\xa0\xa0\x60\x04\x0a\x0a\x04\x00\x00')
FABULOUS=bytearray(b'\x00\x00\x06\x05\x05\x65\xa6\xa0\xa0\xa6\x65\x05\x05\x06\x00\x00')
SURPRISED=bytearray(b'\x00\x00\x00\x00\x00\x00\x00\xbe\x00\x00\x00\x00\x00\x00\x00\x00')
ASLEEP=bytearray(b'\x00\x00\x04\x04\x04\x44\xa0\xa0\xa0\xa0\x44\x04\x04\x04\x00\x00')
ANGRY=bytearray(b'\x00\x00\x01\x02\x84\x42\x21\x10\x10\x21\x42\x84\x02\x01\x00\x00')
CONFUSED=bytearray(b'\x00\x00\x00\x00\x00\x04\x02\xa2\x12\x0c\x00\x00\x00\x00\x00\x00')
NO=bytearray(b'\x00\x00\x00\x00\x42\x24\x18\x18\x24\x42\x00\x00\x00\x00\x00\x00')
YES=bytearray(b'\x00\x00\x00\x00\x10\x20\x40\x40\x20\x10\x08\x04\x02\x00\x00\x00')
LEFT_ARROW=bytearray(b'\x00\x00\x10\x28\x54\x92\x10\x10\x10\x10\x10\x10\x10\x10\x00\x00')
RIGHT_ARROW=bytearray(b'\x00\x00\x10\x10\x10\x10\x10\x10\x10\x10\x92\x54\x28\x10\x00\x00')
DRESS=bytearray(b'\x00\x00\x00\x40\xa2\xd7\xae\xda\xae\xd7\xa2\x40\x00\x00\x00\x00')
TRANSFORMERS=bytearray(b'\x00\x00\x00\x00\x00\x9c\x64\x1a\x64\x9c\x00\x00\x00\x00\x00\x00')
SCISSORS=bytearray(b'\x00\x00\x00\x00\x00\x43\xa6\x6c\x18\x6c\xa6\x43\x00\x00\x00\x00')
EXIT=bytearray(b'\x00\x00\x00\x00\x00\x00\x28\x04\x72\x0e\x17\x25\x48\x88\x00\x00')
TREE=bytearray(b'\x00\x00\x00\x10\x18\x1c\x1e\xff\x1e\x1c\x18\x10\x00\x00\x00\x00')
PACMAN=bytearray(b'\x00\x00\x1c\x36\x63\x41\x45\x41\x49\x55\x22\x00\x00\x00\x00\x00')
TARGET=bytearray(b'\x00\x00\x00\x00\x00\x00\x38\x44\x54\x44\x38\x00\x00\x00\x00\x00')
TSHIRT=bytearray(b'\x00\x00\x00\x04\x0a\xf9\x82\x82\x82\x82\xf9\x0a\x04\x00\x00\x00')
ROLLERSKATE=bytearray(b'\x00\x00\x00\x60\x5f\x71\x11\x17\x14\x14\x74\x58\x60\x00\x00\x00')
DUCK=bytearray(b'\x00\x00\x08\x0c\x0a\xf9\x81\x83\x9e\x90\x90\x90\x50\x30\x10\x00')
HOUSE=bytearray(b'\x04\x06\xfb\x01\x01\xf9\x09\x29\x09\x09\xf9\x01\x01\xfb\x06\x04')
TORTOISE=bytearray(b'\x00\x00\x00\x00\x00\x5e\x3c\x3f\x3f\x3c\x5e\x00\x00\x00\x00\x00')
BUTTERFLY=bytearray(b'\x00\x00\x00\x00\x04\xca\xaa\x5c\x38\x5c\xaa\xca\x04\x00\x00\x00')
STICKFIGURE=bytearray(b'\x00\x00\x00\x00\x00\x90\x4a\x3d\x4a\x90\x00\x00\x00\x00\x00\x00')
GHOST=bytearray(b'\x00\x00\x00\x00\xfe\xdf\xe9\xdf\xe9\xdf\xfe\xc0\x80\x00\x00\x00')
PITCHFORK=bytearray(b'\x08\x08\x08\x08\x08\x08\x08\x08\x08\x08\x3e\x49\x49\x49\x49\x49')
MUSIC_QUAVERS=bytearray(b'\x20\x10\x08\x0c\x1c\x38\x30\x10\x08\x0c\x1c\x38\x30\x10\x08\x04')
MUSIC_QUAVER=bytearray(b'\x06\x05\x05\x05\x05\x05\x05\x05\x36\x7c\xfc\xfc\xf8\x70\x00\x00')
MUSIC_CROTCHET=bytearray(b'\x02\x02\x02\x02\x02\x02\x02\x02\x1a\x3e\x7e\x7e\x7c\x38\x00\x00')
COW=bytearray(b'\x00\x1e\x1a\x1f\xfe\xfc\x1c\x1c\x1c\xfc\xfc\x08\x10\x10\x20\x00')
RABBIT=bytearray(b'\x14\x2a\x2a\x2a\x2a\x2a\x63\x41\x55\x41\x49\x49\x5d\x41\x3e\x00')
SQUARE_SMALL=bytearray(b'\x00\x00\x00\x00\x00\x00\x3c\x24\x24\x3c\x00\x00\x00\x00\x00\x00')
SQUARE=bytearray(b'\x00\x00\x00\x00\xff\x81\x81\x81\x81\x81\x81\xff\x00\x00\x00\x00')
DIAMOND_SMALL=bytearray(b'\x00\x00\x00\x00\x00\x08\x14\x2c\x14\x08\x00\x00\x00\x00\x00\x00')
DIAMOND=bytearray(b'\x00\x00\x04\x0e\x1b\x35\x6f\xdd\x6f\x35\x1b\x0e\x04\x00\x00\x00')
CHESSBOARD=bytearray(b'\x00\x00\x00\xfe\xaa\xfe\xaa\xfe\xaa\xfe\xaa\xfe\xaa\xfe\x00\x00')
TRIANGLE_LEFT=bytearray(b'\x00\x00\x00\x00\x00\x10\x38\x7c\xfe\x00\x00\x00\x00\x00\x00\x00')
TRIANGLE=bytearray(b'\x00\x00\x40\x60\x70\x78\x7c\x7e\x7e\x7c\x78\x70\x60\x40\x00\x00')
SNAKE=bytearray(b'\x00\x40\x60\x70\x38\x18\x18\x18\x18\x1f\x05\x07\x00\x00\x00\x00')
UMBRELLA=bytearray(b'\x00\x00\x00\x08\x0c\x0e\x0e\xff\x8e\xce\x0c\x08\x00\x00\x00\x00')
SKULL=bytearray(b'\x00\x00\x00\x0c\x1a\x79\x99\xd7\x99\x79\x1a\x0c\x00\x00\x00\x00')
GIRAFFE=bytearray(b'\x00\x00\x00\x00\xe0\x20\x20\xff\x03\x02\x00\x00\x00\x00\x00\x00')
SWORD=bytearray(b'\x00\x00\x18\x18\x7e\x3c\x18\x18\x18\x18\x18\x18\x18\x00\x00\x00')

View File

@ -0,0 +1,549 @@
# The MIT License (MIT)
#
# Copyright (c) 2018 Kattni Rembor for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`adafruit_framebuf`
====================================================
CircuitPython pure-python framebuf module, based on the micropython framebuf module.
* Author(s): Melissa LeBlanc-Williams, Kattni Rembor, Tony DiCola, original file
created by Damien P. George
Implementation Notes
--------------------
**Hardware:**
* `Adafruit SSD1306 OLED displays <https://www.adafruit.com/?q=ssd1306>`_
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases
"""
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_framebuf.git"
import os
import struct
# Framebuf format constants:
MVLSB = 0 # Single bit displays (like SSD1306 OLED)
RGB565 = 1 # 16-bit color displays
GS4_HMSB = 2 # Unimplemented!
MHMSB = 3 # Single bit displays like the Sharp Memory
RGB888 = 4 # Neopixels and Dotstars
class MHMSBFormat:
"""MHMSBFormat"""
@staticmethod
def set_pixel(framebuf, x, y, color):
"""Set a given pixel to a color."""
index = (y * framebuf.stride + x) // 8
offset = 7 - x & 0x07
framebuf.buf[index] = (framebuf.buf[index] & ~(0x01 << offset)) | (
(color != 0) << offset
)
@staticmethod
def get_pixel(framebuf, x, y):
"""Get the color of a given pixel"""
index = (y * framebuf.stride + x) // 8
offset = 7 - x & 0x07
return (framebuf.buf[index] >> offset) & 0x01
@staticmethod
def fill(framebuf, color):
"""completely fill/clear the buffer with a color"""
if color:
fill = 0xFF
else:
fill = 0x00
for i in range(len(framebuf.buf)):
framebuf.buf[i] = fill
@staticmethod
def fill_rect(framebuf, x, y, width, height, color):
"""Draw a rectangle at the given location, size and color. The ``fill_rect`` method draws
both the outline and interior."""
# pylint: disable=too-many-arguments
for _x in range(x, x + width):
offset = 7 - _x & 0x07
for _y in range(y, y + height):
index = (_y * framebuf.stride + _x) // 8
framebuf.buf[index] = (framebuf.buf[index] & ~(0x01 << offset)) | (
(color != 0) << offset
)
class MVLSBFormat:
"""MVLSBFormat"""
@staticmethod
def set_pixel(framebuf, x, y, color):
"""Set a given pixel to a color."""
index = (y >> 3) * framebuf.stride + x
offset = y & 0x07
framebuf.buf[index] = (framebuf.buf[index] & ~(0x01 << offset)) | (
(color != 0) << offset
)
@staticmethod
def get_pixel(framebuf, x, y):
"""Get the color of a given pixel"""
index = (y >> 3) * framebuf.stride + x
offset = y & 0x07
return (framebuf.buf[index] >> offset) & 0x01
@staticmethod
def fill(framebuf, color):
"""completely fill/clear the buffer with a color"""
if color:
fill = 0xFF
else:
fill = 0x00
for i in range(len(framebuf.buf)):
framebuf.buf[i] = fill
@staticmethod
def fill_rect(framebuf, x, y, width, height, color):
"""Draw a rectangle at the given location, size and color. The ``fill_rect`` method draws
both the outline and interior."""
# pylint: disable=too-many-arguments
while height > 0:
index = (y >> 3) * framebuf.stride + x
offset = y & 0x07
for w_w in range(width):
framebuf.buf[index + w_w] = (
framebuf.buf[index + w_w] & ~(0x01 << offset)
) | ((color != 0) << offset)
y += 1
height -= 1
class RGB888Format:
"""RGB888Format"""
@staticmethod
def set_pixel(framebuf, x, y, color):
"""Set a given pixel to a color."""
index = (y * framebuf.stride + x) * 3
if isinstance(color, tuple):
framebuf.buf[index : index + 3] = bytes(color)
else:
framebuf.buf[index : index + 3] = bytes(
((color >> 16) & 255, (color >> 8) & 255, color & 255)
)
@staticmethod
def get_pixel(framebuf, x, y):
"""Get the color of a given pixel"""
index = (y * framebuf.stride + x) * 3
return (
(framebuf.buf[index] << 16)
| (framebuf.buf[index + 1] << 8)
| framebuf.buf[index + 2]
)
@staticmethod
def fill(framebuf, color):
"""completely fill/clear the buffer with a color"""
fill = (color >> 16) & 255, (color >> 8) & 255, color & 255
for i in range(0, len(framebuf.buf), 3):
framebuf.buf[i : i + 3] = bytes(fill)
@staticmethod
def fill_rect(framebuf, x, y, width, height, color):
"""Draw a rectangle at the given location, size and color. The ``fill_rect`` method draws
both the outline and interior."""
# pylint: disable=too-many-arguments
fill = (color >> 16) & 255, (color >> 8) & 255, color & 255
for _x in range(x, x + width):
for _y in range(y, y + height):
index = (_y * framebuf.stride + _x) * 3
framebuf.buf[index : index + 3] = bytes(fill)
class FrameBuffer:
"""FrameBuffer object.
:param buf: An object with a buffer protocol which must be large enough to contain every
pixel defined by the width, height and format of the FrameBuffer.
:param width: The width of the FrameBuffer in pixel
:param height: The height of the FrameBuffer in pixel
:param buf_format: Specifies the type of pixel used in the FrameBuffer; permissible values
are listed under Constants below. These set the number of bits used to
encode a color value and the layout of these bits in ``buf``. Where a
color value c is passed to a method, c is a small integer with an encoding
that is dependent on the format of the FrameBuffer.
:param stride: The number of pixels between each horizontal line of pixels in the
FrameBuffer. This defaults to ``width`` but may need adjustments when
implementing a FrameBuffer within another larger FrameBuffer or screen. The
``buf`` size must accommodate an increased step size.
"""
def __init__(self, buf, width, height, buf_format=MVLSB, stride=None):
# pylint: disable=too-many-arguments
self.buf = buf
self.width = width
self.height = height
self.stride = stride
self._font = None
if self.stride is None:
self.stride = width
if buf_format == MVLSB:
self.format = MVLSBFormat()
elif buf_format == MHMSB:
self.format = MHMSBFormat()
elif buf_format == RGB888:
self.format = RGB888Format()
else:
raise ValueError("invalid format")
self._rotation = 0
@property
def rotation(self):
"""The rotation setting of the display, can be one of (0, 1, 2, 3)"""
return self._rotation
@rotation.setter
def rotation(self, val):
if not val in (0, 1, 2, 3):
raise RuntimeError("Bad rotation setting")
self._rotation = val
def fill(self, color):
"""Fill the entire FrameBuffer with the specified color."""
self.format.fill(self, color)
def fill_rect(self, x, y, width, height, color):
"""Draw a rectangle at the given location, size and color. The ``fill_rect`` method draws
both the outline and interior."""
# pylint: disable=too-many-arguments, too-many-boolean-expressions
self.rect(x, y, width, height, color, fill=True)
def pixel(self, x, y, color=None):
"""If ``color`` is not given, get the color value of the specified pixel. If ``color`` is
given, set the specified pixel to the given color."""
if self.rotation == 1:
x, y = y, x
x = self.width - x - 1
if self.rotation == 2:
x = self.width - x - 1
y = self.height - y - 1
if self.rotation == 3:
x, y = y, x
y = self.height - y - 1
if x < 0 or x >= self.width or y < 0 or y >= self.height:
return None
if color is None:
return self.format.get_pixel(self, x, y)
self.format.set_pixel(self, x, y, color)
return None
def hline(self, x, y, width, color):
"""Draw a horizontal line up to a given length."""
self.rect(x, y, width, 1, color, fill=True)
def vline(self, x, y, height, color):
"""Draw a vertical line up to a given length."""
self.rect(x, y, 1, height, color, fill=True)
def circle(self, center_x, center_y, radius, color):
"""Draw a circle at the given midpoint location, radius and color.
The ```circle``` method draws only a 1 pixel outline."""
x = radius - 1
y = 0
d_x = 1
d_y = 1
err = d_x - (radius << 1)
while x >= y:
self.pixel(center_x + x, center_y + y, color)
self.pixel(center_x + y, center_y + x, color)
self.pixel(center_x - y, center_y + x, color)
self.pixel(center_x - x, center_y + y, color)
self.pixel(center_x - x, center_y - y, color)
self.pixel(center_x - y, center_y - x, color)
self.pixel(center_x + y, center_y - x, color)
self.pixel(center_x + x, center_y - y, color)
if err <= 0:
y += 1
err += d_y
d_y += 2
if err > 0:
x -= 1
d_x += 2
err += d_x - (radius << 1)
def rect(self, x, y, width, height, color, *, fill=False):
"""Draw a rectangle at the given location, size and color. The ```rect``` method draws only
a 1 pixel outline."""
# pylint: disable=too-many-arguments
if self.rotation == 1:
x, y = y, x
width, height = height, width
x = self.width - x - width
if self.rotation == 2:
x = self.width - x - width
y = self.height - y - height
if self.rotation == 3:
x, y = y, x
width, height = height, width
y = self.height - y - height
# pylint: disable=too-many-boolean-expressions
if (
width < 1
or height < 1
or (x + width) <= 0
or (y + height) <= 0
or y >= self.height
or x >= self.width
):
return
x_end = min(self.width - 1, x + width - 1)
y_end = min(self.height - 1, y + height - 1)
x = max(x, 0)
y = max(y, 0)
if fill:
self.format.fill_rect(self, x, y, x_end - x + 1, y_end - y + 1, color)
else:
self.format.fill_rect(self, x, y, x_end - x + 1, 1, color)
self.format.fill_rect(self, x, y, 1, y_end - y + 1, color)
self.format.fill_rect(self, x, y_end, x_end - x + 1, 1, color)
self.format.fill_rect(self, x_end, y, 1, y_end - y + 1, color)
def line(self, x_0, y_0, x_1, y_1, color):
# pylint: disable=too-many-arguments
"""Bresenham's line algorithm"""
d_x = abs(x_1 - x_0)
d_y = abs(y_1 - y_0)
x, y = x_0, y_0
s_x = -1 if x_0 > x_1 else 1
s_y = -1 if y_0 > y_1 else 1
if d_x > d_y:
err = d_x / 2.0
while x != x_1:
self.pixel(x, y, color)
err -= d_y
if err < 0:
y += s_y
err += d_x
x += s_x
else:
err = d_y / 2.0
while y != y_1:
self.pixel(x, y, color)
err -= d_x
if err < 0:
x += s_x
err += d_y
y += s_y
self.pixel(x, y, color)
def blit(self):
"""blit is not yet implemented"""
raise NotImplementedError()
def scroll(self, delta_x, delta_y):
"""shifts framebuf in x and y direction"""
if delta_x < 0:
shift_x = 0
xend = self.width + delta_x
dt_x = 1
else:
shift_x = self.width - 1
xend = delta_x - 1
dt_x = -1
if delta_y < 0:
y = 0
yend = self.height + delta_y
dt_y = 1
else:
y = self.height - 1
yend = delta_y - 1
dt_y = -1
while y != yend:
x = shift_x
while x != xend:
self.format.set_pixel(
self, x, y, self.format.get_pixel(self, x - delta_x, y - delta_y)
)
x += dt_x
y += dt_y
# pylint: disable=too-many-arguments
def text(self, string, x, y, color, *, font_name="font5x8.bin", size=1):
"""Place text on the screen in variables sizes. Breaks on \n to next line.
Does not break on line going off screen.
"""
# determine our effective width/height, taking rotation into account
frame_width = self.width
frame_height = self.height
if self.rotation == 1 or self.rotation == 3:
frame_width, frame_height = frame_height, frame_width
for chunk in string.split("\n"):
if not self._font or self._font.font_name != font_name:
# load the font!
self._font = BitmapFont(font_name)
width = self._font.font_width
height = self._font.font_height
for i, char in enumerate(chunk):
char_x = x + (i * (width + 1)) * size
if (
char_x + (width * size) > 0
and char_x < frame_width
and y + (height * size) > 0
and y < frame_height
):
self._font.draw_char(char, char_x, y, self, color, size=size)
y += height * size
# pylint: enable=too-many-arguments
def image(self, img):
"""Set buffer to value of Python Imaging Library image. The image should
be in 1 bit mode and a size equal to the display size."""
# determine our effective width/height, taking rotation into account
width = self.width
height = self.height
if self.rotation == 1 or self.rotation == 3:
width, height = height, width
if isinstance(self.format, RGB888Format) and img.mode != "RGB":
raise ValueError("Image must be in mode RGB.")
if isinstance(self.format, (MHMSBFormat, MVLSBFormat)) and img.mode != "1":
raise ValueError("Image must be in mode 1.")
imwidth, imheight = img.size
if imwidth != width or imheight != height:
raise ValueError(
"Image must be same dimensions as display ({0}x{1}).".format(
width, height
)
)
# Grab all the pixels from the image, faster than getpixel.
pixels = img.load()
# Clear buffer
for i in range(len(self.buf)):
self.buf[i] = 0
# Iterate through the pixels
for x in range(width): # yes this double loop is slow,
for y in range(height): # but these displays are small!
if img.mode == "RGB":
self.pixel(x, y, pixels[(x, y)])
elif pixels[(x, y)]:
self.pixel(x, y, 1) # only write if pixel is true
# MicroPython basic bitmap font renderer.
# Author: Tony DiCola
# License: MIT License (https://opensource.org/licenses/MIT)
class BitmapFont:
"""A helper class to read binary font tiles and 'seek' through them as a
file to display in a framebuffer. We use file access so we dont waste 1KB
of RAM on a font!"""
def __init__(self, font_name="font5x8.bin"):
# Specify the drawing area width and height, and the pixel function to
# call when drawing pixels (should take an x and y param at least).
# Optionally specify font_name to override the font file to use (default
# is font5x8.bin). The font format is a binary file with the following
# format:
# - 1 unsigned byte: font character width in pixels
# - 1 unsigned byte: font character height in pixels
# - x bytes: font data, in ASCII order covering all 255 characters.
# Each character should have a byte for each pixel column of
# data (i.e. a 5x8 font has 5 bytes per character).
self.font_name = font_name
# Open the font file and grab the character width and height values.
# Note that only fonts up to 8 pixels tall are currently supported.
try:
self._font_code=b'\x05\x08\x00\x00\x00\x00\x00>[O[>>kOk>\x1c>|>\x1c\x18<~<\x18\x1cW}W\x1c\x1c^\x7f^\x1c\x00\x18<\x18\x00\xff\xe7\xc3\xe7\xff\x00\x18$\x18\x00\xff\xe7\xdb\xe7\xff0H:\x06\x0e&)y)&@\x7f\x05\x05\x07@\x7f\x05%?Z<\xe7<Z\x7f>\x1c\x1c\x08\x08\x1c\x1c>\x7f\x14"\x7f"\x14__\x00__\x06\t\x7f\x01\x7f\x00f\x89\x95j`````\x94\xa2\xff\xa2\x94\x08\x04~\x04\x08\x10 ~ \x10\x08\x08*\x1c\x08\x08\x1c*\x08\x08\x1e\x10\x10\x10\x10\x0c\x1e\x0c\x1e\x0c08>80\x06\x0e>\x0e\x06\x00\x00\x00\x00\x00\x00\x00_\x00\x00\x00\x07\x00\x07\x00\x14\x7f\x14\x7f\x14$*\x7f*\x12#\x13\x08db6IV P\x00\x08\x07\x03\x00\x00\x1c"A\x00\x00A"\x1c\x00*\x1c\x7f\x1c*\x08\x08>\x08\x08\x00\x80p0\x00\x08\x08\x08\x08\x08\x00\x00``\x00 \x10\x08\x04\x02>QIE>\x00B\x7f@\x00rIIIF!AIM3\x18\x14\x12\x7f\x10\'EEE9<JII1A!\x11\t\x076III6FII)\x1e\x00\x00\x14\x00\x00\x00@4\x00\x00\x00\x08\x14"A\x14\x14\x14\x14\x14\x00A"\x14\x08\x02\x01Y\t\x06>A]YN|\x12\x11\x12|\x7fIII6>AAA"\x7fAAA>\x7fIIIA\x7f\t\t\t\x01>AAQs\x7f\x08\x08\x08\x7f\x00A\x7fA\x00 @A?\x01\x7f\x08\x14"A\x7f@@@@\x7f\x02\x1c\x02\x7f\x7f\x04\x08\x10\x7f>AAA>\x7f\t\t\t\x06>AQ!^\x7f\t\x19)F&III2\x03\x01\x7f\x01\x03?@@@?\x1f @ \x1f?@8@?c\x14\x08\x14c\x03\x04x\x04\x03aYIMC\x00\x7fAAA\x02\x04\x08\x10 \x00AAA\x7f\x04\x02\x01\x02\x04@@@@@\x00\x03\x07\x08\x00 TTx@\x7f(DD88DDD(8DD(\x7f8TTT\x18\x00\x08~\t\x02\x18\xa4\xa4\x9cx\x7f\x08\x04\x04x\x00D}@\x00 @@=\x00\x7f\x10(D\x00\x00A\x7f@\x00|\x04x\x04x|\x08\x04\x04x8DDD8\xfc\x18$$\x18\x18$$\x18\xfc|\x08\x04\x04\x08HTTT$\x04\x04?D$<@@ |\x1c @ \x1c<@0@<D(\x10(DL\x90\x90\x90|DdTLD\x00\x086A\x00\x00\x00w\x00\x00\x00A6\x08\x00\x02\x01\x02\x04\x02<&#&<\x1e\xa1\xa1a\x12:@@ z8TTUY!UUyA"TTxB!UTx@ TUy@\x0c\x1eRr\x129UUUY9TTTY9UTTX\x00\x00E|A\x00\x02E}B\x00\x01E|@}\x12\x11\x12}\xf0(%(\xf0|TUE\x00 TT|T|\n\t\x7fI2III2:DDD:2JHH0:AA!z:B@ x\x00\x9d\xa0\xa0}=BBB==@@@=<$\xff$$H~ICf+/\xfc/+\xff\t)\xf6 \xc0\x88~\t\x03 TTyA\x00\x00D}A0HHJ28@@"z\x00z\n\nr}\r\x191}&))/(&)))&0HM@ 8\x08\x08\x08\x08\x08\x08\x08\x088/\x10\xc8\xac\xba/\x10(4\xfa\x00\x00{\x00\x00\x08\x14*\x14""\x14*\x14\x08U\x00U\x00U\xaaU\xaaU\xaa\xffU\xffU\xff\x00\x00\x00\xff\x00\x10\x10\x10\xff\x00\x14\x14\x14\xff\x00\x10\x10\xff\x00\xff\x10\x10\xf0\x10\xf0\x14\x14\x14\xfc\x00\x14\x14\xf7\x00\xff\x00\x00\xff\x00\xff\x14\x14\xf4\x04\xfc\x14\x14\x17\x10\x1f\x10\x10\x1f\x10\x1f\x14\x14\x14\x1f\x00\x10\x10\x10\xf0\x00\x00\x00\x00\x1f\x10\x10\x10\x10\x1f\x10\x10\x10\x10\xf0\x10\x00\x00\x00\xff\x10\x10\x10\x10\x10\x10\x10\x10\x10\xff\x10\x00\x00\x00\xff\x14\x00\x00\xff\x00\xff\x00\x00\x1f\x10\x17\x00\x00\xfc\x04\xf4\x14\x14\x17\x10\x17\x14\x14\xf4\x04\xf4\x00\x00\xff\x00\xf7\x14\x14\x14\x14\x14\x14\x14\xf7\x00\xf7\x14\x14\x14\x17\x14\x10\x10\x1f\x10\x1f\x14\x14\x14\xf4\x14\x10\x10\xf0\x10\xf0\x00\x00\x1f\x10\x1f\x00\x00\x00\x1f\x14\x00\x00\x00\xfc\x14\x00\x00\xf0\x10\xf0\x10\x10\xff\x10\xff\x14\x14\x14\xff\x14\x10\x10\x10\x1f\x00\x00\x00\x00\xf0\x10\xff\xff\xff\xff\xff\xf0\xf0\xf0\xf0\xf0\xff\xff\xff\x00\x00\x00\x00\x00\xff\xff\x0f\x0f\x0f\x0f\x0f8DD8D\xfcJJJ4~\x02\x02\x06\x06\x02~\x02~\x02cUIAc8DD<\x04@~ \x1e \x06\x02~\x02\x02\x99\xa5\xe7\xa5\x99\x1c*I*\x1cLr\x01rL0JMM00HxH0\xbcbZF=>III\x00~\x01\x01\x01~*****DD_DD@QJD@@DJQ@\x00\x00\xff\x01\x03\xe0\x80\xff\x00\x00\x08\x08kk\x086\x126$6\x06\x0f\t\x0f\x06\x00\x00\x18\x18\x00\x00\x00\x10\x10\x000@\xff\x01\x01\x00\x1f\x01\x01\x1e\x00\x19\x1d\x17\x12\x00<<<<\x00\x00\x00\x00\x00'
self.font_width, self.font_height = struct.unpack("BB", b'\x05\x08')
# simple font file validation check based on expected file size
#if 2 + 256 * self.font_width != os.stat(font_name)[6]:
# raise RuntimeError("Invalid font file: " + font_name)
except OSError:
print("Could not find font file", font_name)
raise
except OverflowError:
# os.stat can throw this on boards without long int support
# just hope the font file is valid and press on
pass
def deinit(self):
"""Close the font file as cleanup."""
#self._font.close()
def __enter__(self):
"""Initialize/open the font file"""
self.__init__()
return self
def __exit__(self, exception_type, exception_value, traceback):
"""cleanup on exit"""
#self.deinit()
pass
def draw_char(
self, char, x, y, framebuffer, color, size=1
): # pylint: disable=too-many-arguments
"""Draw one character at position (x,y) to a framebuffer in a given color"""
size = max(size, 1)
# Don't draw the character if it will be clipped off the visible area.
# if x < -self.font_width or x >= framebuffer.width or \
# y < -self.font_height or y >= framebuffer.height:
# return
# Go through each column of the character.
for char_x in range(self.font_width):
# Grab the byte for the current column of font data.
#self._font.seek(2 + (ord(char) * self.font_width) + char_x)
chcode=bytes([self._font_code[(2 + (ord(char) * self.font_width) + char_x)]])
try:
line = struct.unpack("B", chcode)[0]
except RuntimeError:
continue # maybe character isnt there? go to next
# Go through each row in the column byte.
for char_y in range(self.font_height):
# Draw a pixel for each bit that's flipped on.
if (line >> char_y) & 0x1:
framebuffer.fill_rect(
x + char_x * size, y + char_y * size, size, size, color
)
def width(self, text):
"""Return the pixel width of the specified text message."""
return len(text) * (self.font_width + 1)
class FrameBuffer1(FrameBuffer): # pylint: disable=abstract-method
"""FrameBuffer1 object. Inherits from FrameBuffer."""

View File

@ -0,0 +1,283 @@
# The MIT License (MIT)
#
# Copyright (c) 2017 Scott Shawcroft for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`adafruit_irremote`
====================================================
Demo code for Circuit Playground Express:
.. code-block:: python
# Circuit Playground Express Demo Code
# Adjust the pulseio 'board.PIN' if using something else
import pulseio
import board
import adafruit_irremote
pulsein = pulseio.PulseIn(board.REMOTEIN, maxlen=120, idle_state=True)
decoder = adafruit_irremote.GenericDecode()
while True:
pulses = decoder.read_pulses(pulsein)
print("Heard", len(pulses), "Pulses:", pulses)
try:
code = decoder.decode_bits(pulses)
print("Decoded:", code)
except adafruit_irremote.IRNECRepeatException: # unusual short code!
print("NEC repeat!")
except adafruit_irremote.IRDecodeException as e: # failed to decode
print("Failed to decode: ", e.args)
print("----------------------------")
* Author(s): Scott Shawcroft
Implementation Notes
--------------------
**Hardware:**
* `CircuitPlayground Express <https://www.adafruit.com/product/3333>`_
* `IR Receiver Sensor <https://www.adafruit.com/product/157>`_
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the ESP8622 and M0-based boards:
https://github.com/adafruit/circuitpython/releases
"""
# Pretend self matter because we may add object level config later.
# pylint: disable=no-self-use
import array
import time
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_IRRemote.git"
class IRDecodeException(Exception):
"""Generic decode exception"""
class IRNECRepeatException(Exception):
"""Exception when a NEC repeat is decoded"""
class GenericDecode:
"""Generic decoding of infrared signals"""
def bin_data(self, pulses):
"""Compute bins of pulse lengths where pulses are +-25% of the average.
:param list pulses: Input pulse lengths
"""
bins = [[pulses[0], 0]]
for _, pulse in enumerate(pulses):
matchedbin = False
# print(pulse, end=": ")
for b, pulse_bin in enumerate(bins):
if pulse_bin[0] * 0.75 <= pulse <= pulse_bin[0] * 1.25:
# print("matches bin")
bins[b][0] = (pulse_bin[0] + pulse) // 2 # avg em
bins[b][1] += 1 # track it
matchedbin = True
break
if not matchedbin:
bins.append([pulse, 1])
# print(bins)
return bins
def decode_bits(self, pulses):
"""Decode the pulses into bits."""
# pylint: disable=too-many-branches,too-many-statements
# special exception for NEC repeat code!
if (
(len(pulses) == 3)
and (8000 <= pulses[0] <= 10000)
and (2000 <= pulses[1] <= 3000)
and (450 <= pulses[2] <= 700)
):
raise IRNECRepeatException()
if len(pulses) < 10:
raise IRDecodeException("10 pulses minimum")
# Ignore any header (evens start at 1), and any trailer.
if len(pulses) % 2 == 0:
pulses_end = -1
else:
pulses_end = None
evens = pulses[1:pulses_end:2]
odds = pulses[2:pulses_end:2]
# bin both halves
even_bins = self.bin_data(evens)
odd_bins = self.bin_data(odds)
outliers = [b[0] for b in (even_bins + odd_bins) if b[1] == 1]
even_bins = [b for b in even_bins if b[1] > 1]
odd_bins = [b for b in odd_bins if b[1] > 1]
if not even_bins or not odd_bins:
raise IRDecodeException("Not enough data")
if len(even_bins) == 1:
pulses = odds
pulse_bins = odd_bins
elif len(odd_bins) == 1:
pulses = evens
pulse_bins = even_bins
else:
raise IRDecodeException("Both even/odd pulses differ")
if len(pulse_bins) == 1:
raise IRDecodeException("Pulses do not differ")
if len(pulse_bins) > 2:
raise IRDecodeException("Only mark & space handled")
mark = min(pulse_bins[0][0], pulse_bins[1][0])
space = max(pulse_bins[0][0], pulse_bins[1][0])
if outliers:
# skip outliers
pulses = [
p
for p in pulses
if not (outliers[0] * 0.75) <= p <= (outliers[0] * 1.25)
]
# convert marks/spaces to 0 and 1
for i, pulse_length in enumerate(pulses):
if (space * 0.75) <= pulse_length <= (space * 1.25):
pulses[i] = False
elif (mark * 0.75) <= pulse_length <= (mark * 1.25):
pulses[i] = True
else:
raise IRDecodeException("Pulses outside mark/space")
# convert bits to bytes!
output = [0] * ((len(pulses) + 7) // 8)
for i, pulse_length in enumerate(pulses):
output[i // 8] = output[i // 8] << 1
if pulse_length:
output[i // 8] |= 1
return output
def _read_pulses_non_blocking(
self, input_pulses, max_pulse=10000, pulse_window=0.10
):
"""Read out a burst of pulses without blocking until pulses stop for a specified
period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``.
:param ~pulseio.PulseIn input_pulses: Object to read pulses from
:param int max_pulse: Pulse duration to end a burst
:param float pulse_window: pulses are collected for this period of time
"""
received = None
recent_count = 0
pruning = False
while True:
while input_pulses:
pulse = input_pulses.popleft()
recent_count += 1
if pulse > max_pulse:
if received is None:
continue
pruning = True
if not pruning:
if received is None:
received = []
received.append(pulse)
if recent_count == 0:
return received
recent_count = 0
time.sleep(pulse_window)
def read_pulses(
self,
input_pulses,
*,
max_pulse=10000,
blocking=True,
pulse_window=0.10,
blocking_delay=0.10
):
"""Read out a burst of pulses until pulses stop for a specified
period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``.
:param ~pulseio.PulseIn input_pulses: Object to read pulses from
:param int max_pulse: Pulse duration to end a burst
:param bool blocking: If True, will block until pulses found.
If False, will return None if no pulses.
Defaults to True for backwards compatibility
:param float pulse_window: pulses are collected for this period of time
:param float blocking_delay: delay between pulse checks when blocking
"""
while True:
pulses = self._read_pulses_non_blocking(
input_pulses, max_pulse, pulse_window
)
if blocking and pulses is None:
time.sleep(blocking_delay)
continue
return pulses
class GenericTransmit:
"""Generic infrared transmit class that handles encoding."""
def __init__(self, header, one, zero, trail):
self.header = header
self.one = one
self.zero = zero
self.trail = trail
def transmit(self, pulseout, data):
"""Transmit the ``data`` using the ``pulseout``.
:param pulseio.PulseOut pulseout: PulseOut to transmit on
:param bytearray data: Data to transmit
"""
durations = array.array("H", [0] * (2 + len(data) * 8 * 2 + 1))
durations[0] = self.header[0]
durations[1] = self.header[1]
durations[-1] = self.trail
out = 2
for byte_index, _ in enumerate(data):
for i in range(7, -1, -1):
if (data[byte_index] & 1 << i) > 0:
durations[out] = self.one[0]
durations[out + 1] = self.one[1]
else:
durations[out] = self.zero[0]
durations[out + 1] = self.zero[1]
out += 2
# print(durations)
pulseout.send(durations)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,204 @@
# The MIT License (MIT)
#
# Copyright (c) 2017, 2018 Scott Shawcroft for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`adafruit_rtttl`
====================================================
Play notes to a digialio pin using ring tone text transfer language (rtttl).
* Author(s): Scott Shawcroft
"""
__version__ = "2.4.2"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RTTTL"
import sys
import time
import pwmio
AUDIOIO_AVAILABLE = False
try:
import audioio
from adafruit_waveform import sine
AUDIOIO_AVAILABLE = True
try:
import audiocore
except ImportError:
audiocore = audioio
except ImportError:
pass
PIANO = {
"4c": 261.626,
"4c#": 277.183,
"4d": 293.665,
"4d#": 311.127,
"4e": 329.628,
"4f": 349.228,
"4f#": 369.994,
"4g": 391.995,
"4g#": 415.305,
"4a": 440,
"4a#": 466.164,
"4b": 493.883,
"5c": 523.251,
"5c#": 554.365,
"5d": 587.330,
"5d#": 622.254,
"5e": 659.255,
"5f": 698.456,
"5f#": 739.989,
"5g": 783.991,
"5g#": 830.609,
"5a": 880,
"5a#": 932.328,
"5b": 987.767,
"6c": 1046.50,
"6c#": 1108.73,
"6d": 1174.66,
"6d#": 1244.51,
"6e": 1318.51,
"6f": 1396.91,
"6f#": 1479.98,
"6g": 1567.98,
"6g#": 1661.22,
"6a": 1760,
"6a#": 1864.66,
"6b": 1975.53,
"7c": 2093,
"7c#": 2217.46,
}
def _parse_note(note, duration=2, octave="6"):
note = note.strip()
piano_note = None
note_duration = duration
if note[0].isdigit() and note[1].isdigit():
note_duration = int(note[:2])
piano_note = note[2]
elif note[0].isdigit():
note_duration = int(note[0])
piano_note = note[1]
else:
piano_note = note[0]
if "." in note:
note_duration *= 1.5
if "#" in note:
piano_note += "#"
note_octave = octave
if note[-1].isdigit():
note_octave = note[-1]
piano_note = note_octave + piano_note
return piano_note, note_duration
def _get_wave(tune, octave):
"""Returns the proper waveform to play the song along with the minimum
frequency in the song.
"""
min_freq = 13000
for note in tune.split(","):
piano_note, _ = _parse_note(note, octave=octave)
if piano_note[-1] != "p" and PIANO[piano_note] < min_freq:
min_freq = PIANO[piano_note]
return sine.sine_wave(16000, min_freq), min_freq
# pylint: disable-msg=too-many-arguments
def _play_to_pin(tune, base_tone, min_freq, duration, octave, tempo):
"""Using the prepared input send the notes to the pin
"""
pwm = isinstance(base_tone, pwmio.PWMOut)
for note in tune.split(","):
piano_note, note_duration = _parse_note(note, duration, octave)
if piano_note in PIANO:
if pwm:
base_tone.frequency = int(PIANO[piano_note])
base_tone.duty_cycle = 2 ** 15
else:
# AudioOut interface changed in CP 3.x
if sys.implementation.version[0] >= 3:
pitch = int(PIANO[piano_note])
sine_wave = sine.sine_wave(16000, pitch)
sine_wave_sample = audiocore.RawSample(sine_wave)
base_tone.play(sine_wave_sample, loop=True)
else:
base_tone.frequency = int(16000 * (PIANO[piano_note] / min_freq))
base_tone.play(loop=True)
time.sleep(4 / note_duration * 60 / tempo)
if pwm:
base_tone.duty_cycle = 0
else:
base_tone.stop()
time.sleep(0.02)
# pylint: disable-msg=too-many-arguments
def play(pin, rtttl, octave=None, duration=None, tempo=None):
"""Play notes to a digialio pin using ring tone text transfer language (rtttl).
:param ~digitalio.DigitalInOut pin: the speaker pin
:param rtttl: string containing rtttl
:param int octave: represents octave number (default 6 starts at middle c)
:param int duration: length of notes (default 4 quarter note)
:param int tempo: how fast (default 63 beats per minute)
"""
_, defaults, tune = rtttl.lower().split(":")
for default in defaults.split(","):
if default[0] == "d" and not duration:
duration = int(default[2:])
elif default[0] == "o" and not octave:
octave = default[2:]
elif default[0] == "b" and not tempo:
tempo = int(default[2:])
if not octave:
octave = 6
if not duration:
duration = 4
if not tempo:
tempo = 63
base_tone = None
min_freq = 440
if AUDIOIO_AVAILABLE:
wave, min_freq = _get_wave(tune, octave)
try:
# AudioOut interface changed in CP 3.x; a waveform if now pass
# directly to .play(), generated for each note in _play_to_pin()
if sys.implementation.version[0] >= 3:
base_tone = audioio.AudioOut(pin)
else:
base_tone = audioio.AudioOut(pin, wave)
except ValueError:
# No DAC on the pin so use PWM.
pass
# Fall back to PWM
if not base_tone:
base_tone = pwmio.PWMOut(pin, duty_cycle=0, variable_frequency=True)
_play_to_pin(tune, base_tone, min_freq, duration, octave, tempo)
base_tone.deinit()

View File

@ -0,0 +1,390 @@
# Copyright (c) 2019-2020 Anton Morozenko
# Copyright (c) 2015-2019 Volodymyr Shymanskyy.
# See the file LICENSE for copying permission.
# 宋义深 2021/4/25 Modified
__version__ = '0.2.6'
import socketpool as socket
import ssl
import struct
import time
import wifi
LOGO = """
___ __ __
/ _ )/ /_ _____ / /__
/ _ / / // / _ \\/ '_/
/____/_/\\_, /_//_/_/\\_\\
/___/ for Python v{}\n""".format(__version__)
def stub_log(*args):
pass
def ticks_ms():
return int(time.time() * 1000)
def sleep_ms(ms):
time.sleep(ms // 1000)
class BlynkError(Exception):
pass
class RedirectError(Exception):
def __init__(self, server, port):
self.server = server
self.port = port
class Protocol(object):
MSG_RSP = 0
MSG_LOGIN = 2
MSG_PING = 6
MSG_TWEET = 12
MSG_EMAIL = 13
MSG_NOTIFY = 14
MSG_BRIDGE = 15
MSG_HW_SYNC = 16
MSG_INTERNAL = 17
MSG_PROPERTY = 19
MSG_HW = 20
MSG_REDIRECT = 41
MSG_HEAD_LEN = 5
STATUS_INVALID_TOKEN = 9
STATUS_NO_DATA = 17
STATUS_OK = 200
VPIN_MAX_NUM = 32
_msg_id = 0
def _get_msg_id(self, **kwargs):
if 'msg_id' in kwargs:
return kwargs['msg_id']
self._msg_id += 1
return self._msg_id if self._msg_id <= 0xFFFF else 1
def _pack_msg(self, msg_type, *args, **kwargs):
data = ('\0'.join([str(curr_arg) for curr_arg in args])).encode('utf-8')
return struct.pack('!BHH', msg_type, self._get_msg_id(**kwargs), len(data)) + data
def parse_response(self, rsp_data, msg_buffer):
msg_args = []
try:
msg_type, msg_id, h_data = struct.unpack('!BHH', rsp_data[:self.MSG_HEAD_LEN])
except Exception as p_err:
print(p_err)
raise BlynkError('Message parse error: {}'.format(p_err))
if msg_id == 0:
raise BlynkError('invalid msg_id == 0')
elif h_data >= msg_buffer:
raise BlynkError('Command too long. Length = {}'.format(h_data))
elif msg_type in (self.MSG_RSP, self.MSG_PING):
pass
elif msg_type in (self.MSG_HW, self.MSG_BRIDGE, self.MSG_INTERNAL, self.MSG_REDIRECT):
msg_body = rsp_data[self.MSG_HEAD_LEN: self.MSG_HEAD_LEN + h_data]
msg_args = [itm.decode('utf-8') for itm in msg_body.split(b'\0')]
else:
print('unknown')
raise BlynkError("Unknown message type: '{}'".format(msg_type))
return msg_type, msg_id, h_data, msg_args
def heartbeat_msg(self, heartbeat, rcv_buffer):
return self._pack_msg(self.MSG_INTERNAL, 'ver', __version__, 'buff-in', rcv_buffer, 'h-beat', heartbeat,
'dev', 'python')
def login_msg(self, token):
return self._pack_msg(self.MSG_LOGIN, token)
def ping_msg(self):
return self._pack_msg(self.MSG_PING)
def response_msg(self, *args, **kwargs):
return self._pack_msg(self.MSG_RSP, *args, **kwargs)
def virtual_write_msg(self, v_pin, *val):
return self._pack_msg(self.MSG_HW, 'vw', v_pin, *val)
def virtual_sync_msg(self, *pins):
return self._pack_msg(self.MSG_HW_SYNC, 'vr', *pins)
def email_msg(self, to, subject, body):
return self._pack_msg(self.MSG_EMAIL, to, subject, body)
def tweet_msg(self, msg):
return self._pack_msg(self.MSG_TWEET, msg)
def notify_msg(self, msg):
return self._pack_msg(self.MSG_NOTIFY, msg)
def set_property_msg(self, pin, prop, *val):
return self._pack_msg(self.MSG_PROPERTY, pin, prop, *val)
def internal_msg(self, *args):
return self._pack_msg(self.MSG_INTERNAL, *args)
class Connection(Protocol):
SOCK_MAX_TIMEOUT = 5
SOCK_TIMEOUT = 0
SOCK_SSL_TIMEOUT = 1
EAGAIN = 11
ETIMEDOUT = 60
RETRIES_TX_DELAY = 2
RETRIES_TX_MAX_NUM = 3
RECONNECT_SLEEP = 1
TASK_PERIOD_RES = 50
DISCONNECTED = 0
CONNECTING = 1
AUTHENTICATING = 2
AUTHENTICATED = 3
_state = None
_socket = None
_socketPool = None
_last_rcv_time = 0
_last_ping_time = 0
_last_send_time = 0
def __init__(self, token, server='blynk-cloud.com', port=80, ssl_cert=None, heartbeat=10, rcv_buffer=1024,
log=stub_log):
self.token = token
self.server = server
self.port = port
self.heartbeat = heartbeat
self.rcv_buffer = rcv_buffer
self.log = log
self.ssl_cert = ssl_cert
def send(self, data):
retries = self.RETRIES_TX_MAX_NUM
while retries > 0:
try:
retries -= 1
self._last_send_time = ticks_ms()
return self._socket.send(data)
except (Exception):
sleep_ms(self.RETRIES_TX_DELAY)
def receive(self, length, timeout):
d_buff = bytearray(length)
try:
self._socket.settimeout(timeout)
recv_length = self._socket.recv_into(d_buff)
d_buff = d_buff[:recv_length]
#print(self.parse_response(bytes(d_buff), self.rcv_buffer))
return bytes(d_buff)
except (Exception) as err:
if 'timed out' in str(err):
return b''
if str(self.EAGAIN) in str(err) or str(self.ETIMEDOUT) in str(err):
return b''
raise
def is_server_alive(self):
now = ticks_ms()
h_beat_ms = self.heartbeat * 1000
rcv_delta = now - self._last_rcv_time
ping_delta = now - self._last_ping_time
send_delta = now - self._last_send_time
if rcv_delta > h_beat_ms + (h_beat_ms // 2):
return False
if (ping_delta > h_beat_ms // 10) and (send_delta > h_beat_ms or rcv_delta > h_beat_ms):
self.send(self.ping_msg())
self.log('Heartbeat time: {}'.format(now))
self._last_ping_time = now
return True
def _get_socket(self):
try:
self._state = self.CONNECTING
self._socket = socket.SocketPool(wifi.radio).socket()
self._socketPool = socket.SocketPool(wifi.radio)
self._socket.connect(self._socketPool.getaddrinfo(self.server, self.port)[0][4])
self._socket.settimeout(self.SOCK_TIMEOUT)
if self.ssl_cert:
# system default CA certificates case
if self.ssl_cert == "default":
self.ssl_cert = None
self.log('Using SSL socket...')
ssl_context = ssl.create_default_context(cafile=self.ssl_cert)
ssl_context.verify_mode = ssl.CERT_REQUIRED
self._socket.settimeout(self.SOCK_SSL_TIMEOUT)
self._socket = ssl_context.wrap_socket(sock=self._socket, server_hostname=self.server)
self.log('Connected to blynk server')
except Exception as g_exc:
print(g_exc)
raise BlynkError('Connection with the Blynk server failed: {}'.format(g_exc))
def _authenticate(self):
self._state = self.AUTHENTICATING
self.send(self.login_msg(self.token))
rsp_data = self.receive(self.rcv_buffer, self.SOCK_MAX_TIMEOUT)
if not rsp_data:
raise BlynkError('Auth stage timeout')
msg_type, _, status, args = self.parse_response(rsp_data, self.rcv_buffer)
if status != self.STATUS_OK:
if status == self.STATUS_INVALID_TOKEN:
raise BlynkError('Invalid Auth Token')
if msg_type == self.MSG_REDIRECT:
raise RedirectError(*args)
raise BlynkError('Auth stage failed. Status={}'.format(status))
self._state = self.AUTHENTICATED
def _set_heartbeat(self):
self.send(self.heartbeat_msg(self.heartbeat, self.rcv_buffer))
rcv_data = self.receive(self.rcv_buffer, self.SOCK_MAX_TIMEOUT)
if not rcv_data:
raise BlynkError('Heartbeat stage timeout')
_, _, status, _ = self.parse_response(rcv_data, self.rcv_buffer)
if status != self.STATUS_OK:
raise BlynkError('Set heartbeat returned code={}'.format(status))
self.log('Heartbeat = {} sec. MaxCmdBuffer = {} bytes'.format(self.heartbeat, self.rcv_buffer))
def connected(self):
return True if self._state == self.AUTHENTICATED else False
class Blynk(Connection):
_CONNECT_TIMEOUT = 30 # 30sec
_VPIN_WILDCARD = '*'
_VPIN_READ = 'read v'
_VPIN_WRITE = 'write v'
_INTERNAL = 'internal_'
_CONNECT = 'connect'
_DISCONNECT = 'disconnect'
_VPIN_READ_ALL = '{}{}'.format(_VPIN_READ, _VPIN_WILDCARD)
_VPIN_WRITE_ALL = '{}{}'.format(_VPIN_WRITE, _VPIN_WILDCARD)
_events = {}
def __init__(self, token, **kwargs):
Connection.__init__(self, token, **kwargs)
self._start_time = ticks_ms()
self._last_rcv_time = ticks_ms()
self._last_send_time = ticks_ms()
self._last_ping_time = ticks_ms()
self._state = self.DISCONNECTED
print(LOGO)
def connect(self, timeout=_CONNECT_TIMEOUT):
end_time = time.time() + timeout
while not self.connected():
if self._state == self.DISCONNECTED:
try:
self._get_socket()
print('[Connecting 1/5] Socket got.')
self._authenticate()
print('[Connecting 2/5] Authenticated.')
self._set_heartbeat()
print('[Connecting 3/5] Heartbeat sent.')
self._last_rcv_time = ticks_ms()
print('[Connecting 4/5] Last receive time set.')
self.log('Registered events: {}\n'.format(list(self._events.keys())))
print('[Connecting 5/5] Events registered.')
self.call_handler(self._CONNECT)
return True
except BlynkError as b_err:
self.disconnect(b_err)
sleep_ms(self.TASK_PERIOD_RES)
except RedirectError as r_err:
self.disconnect()
self.server = r_err.server
self.port = r_err.port
sleep_ms(self.TASK_PERIOD_RES)
if time.time() >= end_time:
return False
def disconnect(self, err_msg=None):
self.call_handler(self._DISCONNECT)
if self._socket:
self._socket.close()
self._state = self.DISCONNECTED
if err_msg:
self.log('[ERROR]: {}\nConnection closed'.format(err_msg))
self._msg_id = 0
time.sleep(self.RECONNECT_SLEEP)
def virtual_write(self, v_pin, *val):
return self.send(self.virtual_write_msg(v_pin, *val))
def virtual_sync(self, *v_pin):
return self.send(self.virtual_sync_msg(*v_pin))
def email(self, to, subject, body):
return self.send(self.email_msg(to, subject, body))
def tweet(self, msg):
return self.send(self.tweet_msg(msg))
def notify(self, msg):
return self.send(self.notify_msg(msg))
def set_property(self, v_pin, property_name, *val):
return self.send(self.set_property_msg(v_pin, property_name, *val))
def internal(self, *args):
return self.send(self.internal_msg(*args))
def handle_event(blynk, event_name):
class Deco(object):
def __init__(self, func):
self.func = func
# wildcard 'read V*' and 'write V*' events handling
if str(event_name).lower() in (blynk._VPIN_READ_ALL, blynk._VPIN_WRITE_ALL):
event_base_name = str(event_name).split(blynk._VPIN_WILDCARD)[0]
for i in range(blynk.VPIN_MAX_NUM + 1):
blynk._events['{}{}'.format(event_base_name.lower(), i)] = func
else:
blynk._events[str(event_name).lower()] = func
def __call__(self):
return self.func()
return Deco
def call_handler(self, event, *args, **kwargs):
if event in self._events.keys():
self.log("Event: ['{}'] -> {}".format(event, args))
self._events[event](*args, **kwargs)
def process(self, msg_type, msg_id, msg_len, msg_args):
if msg_type == self.MSG_RSP:
self.log('Response status: {}'.format(msg_len))
elif msg_type == self.MSG_PING:
self.send(self.response_msg(self.STATUS_OK, msg_id=msg_id))
elif msg_type in (self.MSG_HW, self.MSG_BRIDGE, self.MSG_INTERNAL):
if msg_type == self.MSG_INTERNAL:
self.call_handler("{}{}".format(self._INTERNAL, msg_args[0]), msg_args[1:])
elif len(msg_args) >= 3 and msg_args[0] == 'vw':
self.call_handler("{}{}".format(self._VPIN_WRITE, msg_args[1]), int(msg_args[1]), msg_args[2:])
elif len(msg_args) == 2 and msg_args[0] == 'vr':
self.call_handler("{}{}".format(self._VPIN_READ, msg_args[1]), int(msg_args[1]))
def read_response(self, timeout=0.5):
end_time = time.time() + timeout
while time.time() <= end_time:
rsp_data = self.receive(self.rcv_buffer, self.SOCK_TIMEOUT)
if rsp_data:
self._last_rcv_time = ticks_ms()
msg_type, msg_id, h_data, msg_args = self.parse_response(rsp_data, self.rcv_buffer)
self.process(msg_type, msg_id, h_data, msg_args)
def run(self):
if not self.connected():
self.connect()
else:
try:
self.read_response(timeout=self.SOCK_TIMEOUT)
if not self.is_server_alive():
self.disconnect('Blynk server is offline')
except KeyboardInterrupt:
raise
except BlynkError as b_err:
self.log(b_err)
self.disconnect()
except Exception as g_exc:
self.log(g_exc)

View File

@ -0,0 +1,134 @@
# Copyright (c) 2019-2020 Anton Morozenko
# 宋义深 2021/4/25 Modified
"""
Polling timers for functions.
Registers timers and performs run once or periodical function execution after defined time intervals.
"""
# select.select call used as polling waiter where it is possible
# cause time.sleep sometimes may load CPU up to 100% with small polling wait interval
try:
# cpython
import time
import select
polling_wait = lambda x: select.select([], [], [], x)
polling_wait(0.01)
except OSError:
# windows case where select.select call fails
polling_wait = lambda x: time.sleep(x)
except ImportError:
# micropython
import time
try:
from uselect import select as s_select
polling_wait = lambda x: s_select([], [], [], x)
except ImportError:
# case when micropython port does not support select.select
polling_wait = lambda x: time.sleep(x)
WAIT_SEC = 0.05
MAX_TIMERS = 16
DEFAULT_INTERVAL = 10
class TimerError(Exception):
pass
class Timer(object):
timers = {}
def __init__(self, no_timers_err=True):
self.no_timers_err = no_timers_err
def _get_func_name(self, obj):
"""retrieves a suitable name for a function"""
if hasattr(obj, 'func'):
# handles nested decorators
return self._get_func_name(obj.func)
# simply returns 'timer' if on port without function attrs
return getattr(obj, '__name__', 'timer')
def register(blynk, *args, **kwargs):
# kwargs with defaults are used cause PEP 3102 no supported by Python2
interval = kwargs.pop('interval', DEFAULT_INTERVAL)
run_once = kwargs.pop('run_once', False)
stopped = kwargs.pop('stopped', False)
class Deco(object):
def __init__(self, func):
self.func = func
if len(list(Timer.timers.keys())) >= MAX_TIMERS:
raise TimerError('Max allowed timers num={}'.format(MAX_TIMERS))
_timer = _Timer(interval, func, run_once, stopped, *args, **kwargs)
Timer.timers['{}_{}'.format(len(list(Timer.timers.keys())), blynk._get_func_name(func))] = _timer
def __call__(self, *f_args, **f_kwargs):
return self.func(*f_args, **f_kwargs)
return Deco
@staticmethod
def stop(t_id):
timer = Timer.timers.get(t_id, None)
if timer is None:
raise TimerError('Timer id={} not found'.format(t_id))
Timer.timers[t_id].stopped = True
@staticmethod
def start(t_id):
timer = Timer.timers.get(t_id, None)
if timer is None:
raise TimerError('Timer id={} not found'.format(t_id))
Timer.timers[t_id].stopped = False
Timer.timers[t_id].fire_time = None
Timer.timers[t_id].fire_time_prev = None
@staticmethod
def is_stopped(t_id):
timer = Timer.timers.get(t_id, None)
if timer is None:
raise TimerError('Timer id={} not found'.format(t_id))
return timer.stopped
def get_timers(self):
states = {True: 'Stopped', False: 'Running'}
return {k: states[v.stopped] for k, v in self.timers.items()}
def run(self):
polling_wait(WAIT_SEC)
timers_intervals = [curr_timer.run() for curr_timer in Timer.timers.values() if not curr_timer.stopped]
if not timers_intervals and self.no_timers_err:
raise TimerError('Running timers not found')
return timers_intervals
class _Timer(object):
def __init__(self, interval, deco, run_once, stopped, *args, **kwargs):
self.interval = interval
self.deco = deco
self.args = args
self.run_once = run_once
self.kwargs = kwargs
self.fire_time = None
self.fire_time_prev = None
self.stopped = stopped
def run(self):
timer_real_interval = 0
if self.fire_time is None:
self.fire_time = time.time() + self.interval
if self.fire_time_prev is None:
self.fire_time_prev = time.time()
curr_time = time.time()
if curr_time >= self.fire_time:
self.deco(*self.args, **self.kwargs)
if self.run_once:
self.stopped = True
timer_real_interval = curr_time - self.fire_time_prev
self.fire_time_prev = self.fire_time
self.fire_time = curr_time + self.interval
return timer_real_interval

View File

@ -0,0 +1,49 @@
"""
Button
CircuitPython library for Button - MixGoCE
=======================================================
Small Cabbage
20210721
dahanzimin
20210423
"""
import time
import board
from digitalio import DigitalInOut, Direction, Pull
class Button():
def __init__(self, pin):
self.pin = DigitalInOut(pin)
self.pin.direction = Direction.INPUT
self.pin.pull = Pull.UP
self.flag = True
def is_pressed(self):
return self.pin.value == False
def get_presses(self, delay=1):
last_time, presses = time.monotonic(), 0
while time.monotonic()< last_time + delay:
time.sleep(0.05)
if self.was_pressed():
presses += 1
return presses
def was_pressed(self):
if self.pin.value != self.flag:
self.flag = self.pin.value
time.sleep(0.01)
if self.flag == False:
return True
else:
return False
button_A1 = Button(board.IO14)
button_A2 = Button(board.IO21)
button_A3 = Button(board.IO36)
button_A4 = Button(board.IO37)
button_B1 = Button(board.IO0)
button_B2 = Button(board.IO35)

View File

@ -0,0 +1,41 @@
"""
Infrared Sensor
CircuitPython library for Infrared Sensor - MixGoCE
=======================================================
Small Cabbage
20210721
dahanzimin
20210423
"""
from mixgoce import version
import board
from pwmio import PWMOut
if version:#new
def near(x, val=10000):
from analogio import AnalogIn
pwm = PWMOut(board.IO39, frequency=50000, duty_cycle=65535)
if x == 'left':
IR=AnalogIn(board.IO3)
reaction = IR.value-10000 #fix
if x == 'right':
IR=AnalogIn(board.IO16)
reaction = IR.value
IR.deinit()
pwm.deinit()
return reaction > val
else:#old
def near(x, val=10000):
from digitalio import DigitalInOut, Direction
IR = DigitalInOut(board.IO38)
IR.direction = Direction.INPUT
pwm = PWMOut(board.IO39, frequency=53000, duty_cycle=100)
reaction = IR.value
IR.deinit()
pwm.deinit()
return reaction

View File

@ -0,0 +1,111 @@
"""
IR Remote
CircuitPython library for IR Remote - MixGoCE
=======================================================
Small Cabbage
20210811
"""
import pulseio
import board
import adafruit_irremote
first_init =True
def encode2hex(code):
res = "0X"
for c in code:
c = hex(c)[2:]
if len(c) == 1:
c = "0" + c
res = res + c.upper()
return res
def decode2list(string):
res = [0, 0, 0, 0]
prefix = "0" * (8 - len(string[2:]))
string = prefix + string[2:]
for i in range(4):
res[i] = int(string[2*i:2*(i+1)], 16)
return res
def ir_receive_hex(pin=board.IO38, flag=False):
global first_init
global decoder
global pulsein
if first_init :
pulsein = pulseio.PulseIn(pin, maxlen=120, idle_state=True)
decoder = adafruit_irremote.GenericDecode()
first_init=False
pulses = decoder.read_pulses(pulsein, blocking=False)
if pulses:
try:
# print("Heard", len(pulses), "Pulses:", pulses)
code = decoder.decode_bits(pulses)
res = encode2hex(code)
if flag:
print("Decoded:", res)
# pulsein.deinit()
return res
except adafruit_irremote.IRNECRepeatException: # unusual short code!
print("NEC repeat!")
except adafruit_irremote.IRDecodeException as e: # failed to decode
if flag:
print("Failed to decode: ", e.args)
# except TypeError as e:
# pass
# finally:
# pulsein.deinit()
def ir_receive(pin=board.IO38,flag=False):
global first_init
global decoder
global pulsein
if first_init :
pulsein = pulseio.PulseIn(pin, maxlen=120, idle_state=True)
decoder = adafruit_irremote.GenericDecode()
first_init=False
pulses = decoder.read_pulses(pulsein, blocking=False)
if pulses:
try:
code = decoder.decode_bits(pulses)
if flag:
print("Decoded:", code)
return code
except adafruit_irremote.IRNECRepeatException: # unusual short code!
print("NEC repeat!")
except adafruit_irremote.IRDecodeException as e: # failed to decode
if flag:
print("Failed to decode: ", e.args)
def ir_send(content=[255, 0, 0, 0], pin=board.IO39):
pulseout = pulseio.PulseOut(pin, frequency=38000, duty_cycle=2 ** 15)
# Create an encoder that will take numbers and turn them into NEC IR pulses
encoder = adafruit_irremote.GenericTransmit(
header=[9500, 4500], one=[550, 550], zero=[550, 1700], trail=550
)
encoder.transmit(pulseout, content)
print("IR signal sent!")
pulseout.deinit()
def ir_send_hex(content="0XFF000000", pin=board.IO39): # [255, 0, 0, 0]
pulseout = pulseio.PulseOut(pin, frequency=38000, duty_cycle=2 ** 15)
# Create an encoder that will take numbers and turn them into NEC IR pulses
encoder = adafruit_irremote.GenericTransmit(
header=[9500, 4500], one=[550, 550], zero=[550, 1700], trail=550
)
encoder.transmit(pulseout, decode2list(content))
print("IR signal sent!")
pulseout.deinit()

View File

@ -0,0 +1,34 @@
"""
Led
CircuitPython library for Led - MixGoCE
=======================================================
Small Cabbage
20210721
dahanzimin
20210423
"""
import board
from pwmio import PWMOut
class Led(object): #fix
def __init__(self, pin):
self.pin = PWMOut(pin, frequency=5000, duty_cycle=65535)
def setbrightness(self, val):
self.pin.duty_cycle = 65535 - val
def setonoff(self, val):
if val == 1:
self.pin.duty_cycle = 0
elif val == 0:
self.pin.duty_cycle = 65535
else:
self.pin.duty_cycle = 65535 - self.pin.duty_cycle
def getonoff(self):
return (65535 - self.pin.duty_cycle) // 65535
led_L1 = Led(board.IO33)
led_L2 = Led(board.IO34)

View File

@ -0,0 +1,97 @@
# SPDX-FileCopyrightText: 2017 Yoch <https://github.com/yoch>
#
# SPDX-License-Identifier: EPL-1.0
"""
`matcher`
====================================================================================
MQTT topic filter matcher from the Eclipse Project's Paho.MQTT.Python
https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/matcher.py
* Author(s): Yoch (https://github.com/yoch)
"""
class MQTTMatcher:
"""Intended to manage topic filters including wildcards.
Internally, MQTTMatcher use a prefix tree (trie) to store
values associated with filters, and has an iter_match()
method to iterate efficiently over all filters that match
some topic name.
"""
# pylint: disable=too-few-public-methods
class Node:
"""Individual node on the MQTT prefix tree."""
__slots__ = "children", "content"
def __init__(self):
self.children = {}
self.content = None
def __init__(self):
self._root = self.Node()
def __setitem__(self, key, value):
"""Add a topic filter :key to the prefix tree
and associate it to :value"""
node = self._root
for sym in key.split("/"):
node = node.children.setdefault(sym, self.Node())
node.content = value
def __getitem__(self, key):
"""Retrieve the value associated with some topic filter :key"""
try:
node = self._root
for sym in key.split("/"):
node = node.children[sym]
if node.content is None:
raise KeyError(key)
return node.content
except KeyError:
raise KeyError(key) from None
def __delitem__(self, key):
"""Delete the value associated with some topic filter :key"""
lst = []
try:
parent, node = None, self._root
for k in key.split("/"):
parent, node = node, node.children[k]
lst.append((parent, k, node))
node.content = None
except KeyError:
raise KeyError(key) from None
else: # cleanup
for parent, k, node in reversed(lst):
if node.children or node.content is not None:
break
del parent.children[k]
def iter_match(self, topic):
"""Return an iterator on all values associated with filters
that match the :topic"""
lst = topic.split("/")
normal = not topic.startswith("$")
def rec(node, i=0):
if i == len(lst):
if node.content is not None:
yield node.content
else:
part = lst[i]
if part in node.children:
for content in rec(node.children[part], i + 1):
yield content
if "+" in node.children and (normal or i > 0):
for content in rec(node.children["+"], i + 1):
yield content
if "#" in node.children and (normal or i > 0):
content = node.children["#"].content
if content is not None:
yield content
return rec(self._root)

View File

@ -0,0 +1,277 @@
# The MIT License (MIT)
#
# Copyright (c) 2016 Radomir Dopieralski & Tony DiCola for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""
`adafruit_ht16k33.ht16k33`
===========================
* Authors: Radomir Dopieralski & Tony DiCola for Adafruit Industries
"""
from adafruit_bus_device import i2c_device
from micropython import const
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_HT16K33.git"
_HT16K33_BLINK_CMD = const(0x80)
_HT16K33_BLINK_DISPLAYON = const(0x01)
_HT16K33_CMD_BRIGHTNESS = const(0xE0)
_HT16K33_OSCILATOR_ON = const(0x21)
class HT16K33:
"""
The base class for all displays. Contains common methods.
:param int address: The I2C addess of the HT16K33.
:param bool auto_write: True if the display should immediately change when
set. If False, `show` must be called explicitly.
:param float brightness: 0.0 - 1.0 default brightness level.
"""
def __init__(self, i2c, address=0x70, auto_write=True, brightness=1.0):
self.i2c_device = i2c_device.I2CDevice(i2c, address)
self._temp = bytearray(1)
self._buffer = bytearray(17)
self._auto_write = auto_write
self.fill(0)
self._write_cmd(_HT16K33_OSCILATOR_ON)
self._blink_rate = None
self._brightness = None
self.blink_rate = 0
self.brightness = brightness
def _write_cmd(self, byte):
self._temp[0] = byte
with self.i2c_device:
self.i2c_device.write(self._temp)
@property
def blink_rate(self):
"""The blink rate. Range 0-3."""
return self._blink_rate
@blink_rate.setter
def blink_rate(self, rate=None):
if not 0 <= rate <= 3:
raise ValueError("Blink rate must be an integer in the range: 0-3")
rate = rate & 0x03
self._blink_rate = rate
self._write_cmd(_HT16K33_BLINK_CMD | _HT16K33_BLINK_DISPLAYON | rate << 1)
@property
def brightness(self):
"""The brightness. Range 0.0-1.0"""
return self._brightness
@brightness.setter
def brightness(self, brightness):
if not 0.0 <= brightness <= 1.0:
raise ValueError(
"Brightness must be a decimal number in the range: 0.0-1.0"
)
self._brightness = brightness
xbright = round(15 * brightness)
xbright = xbright & 0x0F
self._write_cmd(_HT16K33_CMD_BRIGHTNESS | xbright)
@property
def auto_write(self):
"""Auto write updates to the display."""
return self._auto_write
@auto_write.setter
def auto_write(self, auto_write):
if isinstance(auto_write, bool):
self._auto_write = auto_write
else:
raise ValueError("Must set to either True or False.")
def show(self):
"""Refresh the display and show the changes."""
with self.i2c_device:
# Byte 0 is 0x00, address of LED data register. The remaining 16
# bytes are the display register data to set.
self.i2c_device.write(self._buffer)
def fill(self, color):
"""Fill the whole display with the given color."""
fill = 0xFF if color else 0x00
for i in range(16):
self._buffer[i + 1] = fill
if self._auto_write:
self.show()
def _pixel(self, x, y, color=None):
addr = 2 * y + x // 8
mask = 1 << x % 8
if color is None:
return bool(self._buffer[addr + 1] & mask)
if color:
# set the bit
self._buffer[addr + 1] |= mask
else:
# clear the bit
self._buffer[addr + 1] &= ~mask
if self._auto_write:
self.show()
return None
def _set_buffer(self, i, value):
self._buffer[i + 1] = value # Offset by 1 to move past register address.
def _get_buffer(self, i):
return self._buffer[i + 1] # Offset by 1 to move past register address.
class MatrixBackpack16x8(HT16K33):
"""A single matrix."""
_columns = 16
_rows = 8
def pixel(self, x, y, color=None):
"""Get or set the color of a given pixel."""
if not 0 <= x <= 15:
return None
if not 0 <= y <= 7:
return None
return super()._pixel(x, y, color)
def __getitem__(self, key):
x, y = key
return self.pixel(x, y)
def __setitem__(self, key, value):
x, y = key
self.pixel(x, y, value)
# pylint: disable=too-many-branches
def shift(self, x, y, rotate=False):
"""
Shift pixels by x and y
:param rotate: (Optional) Rotate the shifted pixels to the left side (default=False)
"""
auto_write = self.auto_write
self._auto_write = False
if x > 0: # Shift Right
for _ in range(x):
for row in range(0, self.rows):
last_pixel = self[self.columns - 1, row] if rotate else 0
for col in range(self.columns - 1, 0, -1):
self[col, row] = self[col - 1, row]
self[0, row] = last_pixel
elif x < 0: # Shift Left
for _ in range(-x):
for row in range(0, self.rows):
last_pixel = self[0, row] if rotate else 0
for col in range(0, self.columns - 1):
self[col, row] = self[col + 1, row]
self[self.columns - 1, row] = last_pixel
if y > 0: # Shift Up
for _ in range(y):
for col in range(0, self.columns):
last_pixel = self[col, self.rows - 1] if rotate else 0
for row in range(self.rows - 1, 0, -1):
self[col, row] = self[col, row - 1]
self[col, 0] = last_pixel
elif y < 0: # Shift Down
for _ in range(-y):
for col in range(0, self.columns):
last_pixel = self[col, 0] if rotate else 0
for row in range(0, self.rows - 1):
self[col, row] = self[col, row + 1]
self[col, self.rows - 1] = last_pixel
self._auto_write = auto_write
if auto_write:
self.show()
# pylint: enable=too-many-branches
def shift_right(self, rotate=False):
"""
Shift all pixels right
:param rotate: (Optional) Rotate the shifted pixels to the left side (default=False)
"""
self.shift(1, 0, rotate)
def shift_left(self, rotate=False):
"""
Shift all pixels left
:param rotate: (Optional) Rotate the shifted pixels to the right side (default=False)
"""
self.shift(-1, 0, rotate)
def shift_up(self, rotate=False):
"""
Shift all pixels up
:param rotate: (Optional) Rotate the shifted pixels to bottom (default=False)
"""
self.shift(0, 1, rotate)
def shift_down(self, rotate=False):
"""
Shift all pixels down
:param rotate: (Optional) Rotate the shifted pixels to top (default=False)
"""
self.shift(0, -1, rotate)
def image(self, img):
"""Set buffer to value of Python Imaging Library image. The image should
be in 1 bit mode and a size equal to the display size."""
imwidth, imheight = img.size
if imwidth != self.columns or imheight != self.rows:
raise ValueError(
"Image must be same dimensions as display ({0}x{1}).".format(
self.columns, self.rows
)
)
# Grab all the pixels from the image, faster than getpixel.
pixels = img.convert("1").load()
# Iterate through the pixels
for x in range(self.columns): # yes this double loop is slow,
for y in range(self.rows): # but these displays are small!
self.pixel(x, y, pixels[(x, y)])
if self._auto_write:
self.show()
@property
def columns(self):
"""Read-only property for number of columns"""
return self._columns
@property
def rows(self):
"""Read-only property for number of rows"""
return self._rows

View File

@ -0,0 +1,85 @@
# Paste mixgoce.py
import board
from rtc import RTC
from busio import I2C
def do_connect(username, password):
import wifi
from time import sleep
wifi.radio.connect(ssid=username.encode(), password=password.encode())
while not wifi.radio.ipv4_address:
sleep(0.1)
print("Wi-Fi connected!")
return wifi.radio
rtc_clock = RTC()
try:
i2c = I2C(board.IO41, board.IO42,frequency=400000)
while not i2c.try_lock():
pass
if 0x6a in i2c.scan():
version = 99
elif 0x15 in i2c.scan():
version = 1
else:
version = 0
i2c.unlock()
if version == 99:
i2c.deinit()
elif version:#new
import mxc6655xa
acc = mxc6655xa.MXC6655XA(i2c)
def get_temperature():
return acc.get_temperature()
else:#old
import msa301
acc = msa301.MSA301(i2c)
except Exception as e:
print(e)
ADDITIONAL_TOPIC = 'b640a0ce465fa2a4150c36b305c1c11b'
WILL_TOPIC = '9d634e1a156dc0c1611eb4c3cff57276'
def ntp(url='http://mixio.mixly.org/time.php'):
import ssl
import wifi
import socketpool
import adafruit_requests
#TEXT_URL = 'http://mixio.mixly.org/time.php'
#TEXT_URL = url
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
# print("Fetching text from", TEXT_URL)
response = requests.get(url)
# print("-" * 40)
# print(tuple(response.text.split(",")))
# print("-" * 40)
return tuple(response.text.split(","))
def analyse_sharekey(url):
import ssl
import wifi
import socketpool
import adafruit_requests
import json
#TEXT_URL = 'http://mixio.mixly.org/time.php'
#TEXT_URL = url
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
# print("Fetching text from", TEXT_URL)
response = requests.get(url)
# print("-" * 40)
# print(tuple(response.text.split(",")))
# print("-" * 40)
if response.text == '-1':
print('Invalid share key')
else:
result = json.loads(response.text)
return (result['0'], result['1'], result['2'])

View File

@ -0,0 +1,84 @@
#coding=utf-8
import math
def math_map(v, al, ah, bl, bh):
if al==ah:
return bl
if al > ah:
al, ah = ah, al
if v > ah:
v = ah
if v < al:
v = al
return bl + (bh - bl) * (v - al) / (ah - al)
def math_mean(myList):
localList = [e for e in myList if type(e) == int or type(e) == float]
if not localList: return
return float(sum(localList)) / len(localList)
def math_median(myList):
localList = sorted([e for e in myList if type(e) == int or type(e) == float])
if not localList: return
if len(localList) % 2 == 0:
return (localList[len(localList) // 2 - 1] + localList[len(localList) // 2]) / 2.0
else:
return localList[(len(localList) - 1) // 2]
def math_modes(some_list):
modes = []
# Using a lists of [item, count] to keep count rather than dict
# to avoid "unhashable" errors when the counted item is itself a list or dict.
counts = []
maxCount = 1
for item in some_list:
found = False
for count in counts:
if count[0] == item:
count[1] += 1
maxCount = max(maxCount, count[1])
found = True
if not found:
counts.append([item, 1])
for counted_item, item_count in counts:
if item_count == maxCount:
modes.append(counted_item)
return modes
def math_standard_deviation(numbers):
n = len(numbers)
if n == 0: return
mean = float(sum(numbers)) / n
variance = sum((x - mean) ** 2 for x in numbers) / n
return math.sqrt(variance)
def lists_sort(my_list, type, reverse):
def try_float(s):
try:
return float(s)
except:
return 0
key_funcs = {
"NUMERIC": try_float,
"TEXT": str,
"IGNORE_CASE": lambda s: str(s).lower()
}
key_func = key_funcs[type]
list_cpy = list(my_list)
return sorted(list_cpy, key=key_func, reverse=reverse)
def format_content(mydict, cid):
if 'lat' in mydict and 'long' in mydict:
res = '{'+'"lat": "{}", "long": "{}", "clientid": "{}"'.format(mydict.pop('lat'),mydict.pop('long'),cid)
if len(mydict)>0:
res += ', "message": ['
for d in mydict:
res += '{{"label": "{}", "value": "{}"}},'.format(d,mydict[d])
res = res[:-1] + "]"
res += '}'
return res
else:
print('Invalid Input')
def format_str(d):
return str(d).replace("'",'"')

View File

@ -0,0 +1,139 @@
"""
Matrix
CircuitPython library for Matrix - MixGoCE
=======================================================
Small Cabbage
20210721
"""
import time
from mixgoce import i2c
from matrix import MatrixBackpack16x8
from adafruit_framebuf import FrameBuffer,MVLSB
buf = bytearray(16)
fb = FrameBuffer(buf, 16, 8, MVLSB)
class MixGoMatrix(MatrixBackpack16x8):
def show_dynamic(self, to_show, delay=200):
# self.fill(0)
if type(to_show)==str or type(to_show)==int:
for i in to_show:
fb.fill(0)
fb.text(i, 5, 0, color=1)
# turn all LEDs off
self.fill(0)
for x in range(16):
# using the FrameBuffer text result
bite = buf[x]
for y in range(8):
bit = 1 << y & bite
# if bit > 0 then set the pixel brightness
if bit:
self[x, y] = 1
self.show()
if len(to_show)>1:
time.sleep(delay/1000)
elif type(to_show)==list or type(to_show)==tuple:
for i in to_show:
#if type(i)!=str and type(i)!=type(Image.HEART):
if type(i)!=str and type(i)!=type(bytearray(16)):
pass
for i in to_show:
self.show_dynamic(i)
time.sleep(delay/1000)
elif type(to_show)==type(bytearray(16)):
buf = to_show
# turn all LEDs off
self.fill(0)
# print(buf)
for x in range(16):
# using the FrameBuffer text result
bite = buf[x]
# print(bite,end=" ")
for y in range(8):
bit = 1 << y & bite
# if bit > 0 then set the pixel brightness
if bit:
self[x, y] = 1
self.show()
def scroll(self, text_to_show, delay=0):
for i in range(len(text_to_show) * 6 + 26):
fb.fill(0)
fb.text(text_to_show, -i + 16, 0, color=1)
# turn all LEDs off
self.fill(0)
for x in range(16):
# using the FrameBuffer text result
bite = buf[x]
for y in range(8):
bit = 1 << y & bite
# if bit > 0 then set the pixel brightness
if bit:
self[x, y] = 1
self.show()
time.sleep(delay/1000)
def show_static(self, text_to_show):
fb.fill(0)
if len(text_to_show)==2:
l = 3
elif len(text_to_show)==1:
l = 5
else:
l = 0
fb.text(text_to_show, l, 0, color=1)
# turn all LEDs off
self.fill(0)
for x in range(16):
# using the FrameBuffer text result
bite = buf[x]
for y in range(8):
bit = 1 << y & bite
# if bit > 0 then set the pixel brightness
if bit:
self[x, y] = 1
self.show()
def set_brightness(self, val):
self.brightness = max(min(val, 1), 0)
def get_brightness(self):
return self.brightness
def set_pixel(self, x, y, val):
self[x, y] = val
self.show()
def get_pixel(self, x, y):
return int(self[x, y])
def clear(self):
self.fill(0)
self.show()
def up(self, times, rotate=False):
for i in range(times):
self.shift_down(rotate)
self.show()
def down(self, times, rotate=False):
for i in range(times):
self.shift_up(rotate)
self.show()
def left(self, times, rotate=False):
for i in range(times):
self.shift_left(rotate)
self.show()
def right(self, times, rotate=False):
for i in range(times):
self.shift_right(rotate)
self.show()
display = MixGoMatrix(i2c, auto_write=False, brightness=0.1)
display.clear()

View File

@ -0,0 +1,70 @@
"""
MSA301
CircuitPython library for the MSA301 Accelerometer
=======================================================
dahanzimin
20210411
mixly
"""
import time
import adafruit_bus_device.i2c_device as i2c_device
from micropython import const
MSA301_ADDRESS = const(0x26)
MSA301_REG_DEVICE_ID = const(0x01)
MSA301_REG_DATA = const(0x02)
MSA301_REG_ODR = const(0x10)
MSA301_REG_POWERMODE = const(0x11)
MSA301_REG_RESRANGE = const(0x0F)
_STANDARD_GRAVITY = 9.806
class MSA301:
_BUFFER = bytearray(2)
data_reg = bytearray(6)
def __init__(self, i2c_bus):
self._device = i2c_device.I2CDevice(i2c_bus, MSA301_ADDRESS)
if self._chip_id() != 0x13:
raise AttributeError("Cannot find a MSA301")
self._write_u8(MSA301_REG_ODR,0X09) #RATE_500_HZ
self._write_u8(MSA301_REG_POWERMODE,0X12) #NORMAL & WIDTH_250_HZ
self._write_u8(MSA301_REG_RESRANGE,0X02) #RESOLUTION_14_BIT & RANGE_8_G
def _read_u8(self, address):
# Read an 8-bit unsigned value from the specified 8-bit address.
with self._device:
self._BUFFER[0] = address & 0xFF
self._device.write(self._BUFFER, end=1)
self._device.readinto(self._BUFFER, end=1)
return self._BUFFER[0]
def _write_u8(self, address, val):
# Write an 8-bit unsigned value to the specified 8-bit address.
with self._device:
self._BUFFER[0] = address & 0xFF
self._BUFFER[1] = val & 0xFF
self._device.write(self._BUFFER, end=2)
def _chip_id(self):
return self._read_u8(MSA301_REG_DEVICE_ID)
def u2s(self,n):
return n if n < (1 << 7) else n - (1 << 8)
@property
def acceleration(self):
for i in range(0,6):
self.data_reg[i]=self._read_u8(MSA301_REG_DATA+i)
x_acc=((self.u2s(self.data_reg[1])<<8|self.data_reg[0])>>2)/1024.0
y_acc=((self.u2s(self.data_reg[3])<<8|self.data_reg[2])>>2)/1024.0
z_acc=((self.u2s(self.data_reg[5])<<8|self.data_reg[4])>>2)/1024.0
return (-y_acc,-x_acc,z_acc)

View File

@ -0,0 +1,102 @@
"""
Music
CircuitPython library for Music - MixGoCE
=======================================================
Small Cabbage
20210721
"""
import time
from mixgoce import version
class Music:
def __init__(self, pin, duration='4', octave='6', tempo='63'):
import pwmio
self.pin = pin
self.pwm = pwmio.PWMOut(pin, duty_cycle=0, variable_frequency=True)
self.duration = duration
self.octave = octave
self.tempo = tempo
def set_duration(self, duration='4'):
self.duration = duration
def set_octave(self, octave='6'):
self.octave = octave
def sef_tempo(self, tempo='63'):
self.tempo = tempo
def set_duration_tempo(self, duration, tempo):#这项等于原来的set_tempo
self.duration = duration
self.tempo = tempo
def reset(self):
self.set_duration()
self.set_octave()
self.set_tempo()
def get_duration(self):
return self.duration
def get_octave(self):
return self.octave
def get_tempo(self):
return (self.tempo)
def play_demo(self, demo):
import adafruit_rtttl
if self.pwm:
self.pwm.deinit()
adafruit_rtttl.play(self.pin, demo)
def play_demo_print(self, demo):
import adafruit_rtttl
if self.pwm:
self.pwm.deinit()
adafruit_rtttl.play(self.pin, demo)
_, defaults, tune = demo.lower().split(":")
print(tune)
def play(self, note, duration=None):
import pwmio
if self.pwm:
self.pwm.deinit()
self.pwm = pwmio.PWMOut(self.pin, duty_cycle=0, variable_frequency=True)
self.pwm.frequency = int(note)
self.pwm.duty_cycle = 2 ** 15
if duration:
# print(1)
time.sleep(duration/1000)
self.pwm.duty_cycle = 0
def stop(self):
self.pwm.duty_cycle = 0
self.pwm.deinit()
BA_DING = "itchy:d=4,o=4,b=300:4b5,2g5"
JUMP_UP = "itchy:d=4,o=5,b=300:c,d,e,f,g"
JUMP_DOWN = "itchy:d=4,o=5,b=300:g,f,e,d,c"
POWER_UP = "itchy:d=4,o=4,b=300:4g4,4c5,4e5,2g5,4e5,1g5"
POWER_DOWN = "itchy:d=4,o=4,b=300:4g5,4d#5,4c5,2g4,4b4,1c5"
DADADADUM = "itchy:d=4,o=4,b=200:2p,g,g,g,1d#,p,f,f,f,1d"
#ENTERTAINER = "itchy:d=4,o=4,b=300:4d4,d#4,e4,2c5,4e4,2c5,4e4,1c5,c5,d5,d#5,e5,c5,d5,2e5,b4,2d5,1c5"
BIRTHDAY = "itchy:d=4,o=4,b=180:6c,12c,4d,4c,4f,2e,6c,12c,4d,4c,4g,2f,6c,12c,4c5,4a,4f,4e,4d,6a#,12a#,4a,4f,4g,2f"
#BLUES = "itchy:d=4,o=4,b=400:2c,4e,4g,4a,4a#,4a,4g,4e,2c,4e,4g,4a,4a#,4a,4g,4e,4f,4a,2c,4d,4d#,4d,4c,4a,2c,4e,4g,4a,4a#,4a,4g,4e,4g,4b,4d,4f,4f,4a,4c,4d#,2c,e,g,e,g,f,e,d"
#PYTHON = "itchy:d=4,o=4,b=380:d5,4b4,p,b4,b4,a#4,b4,g5,p,d5,d5,r,b4,c5,p,c5,c5,p,d5,1e5,p,c5,a4,p,a4,a4,g#4,a4,f#5,p,e5,e5,p,c5,b4,p,b4,b4,p,c5,1d5,p,d5,4b4,p,b4,b4,a#4,b4,b5,p,g5,g5,p,d5,c#5,p,a5,a5,p,a5,1a5,p,g5,f#5,p,a5,a5,g#5,a5,e5,p,a5,a5,g#5,a5,d5,p,c#5,d5,p,c#5,2d5,3p"
import board
if version:#new
buzzer = Music(board.IO40)
else:#old
from digitalio import DigitalInOut, Direction
buzzer = Music(board.IO17)
spk_en = DigitalInOut(board.IO40)
spk_en.direction = Direction.OUTPUT
spk_en.value = True

View File

@ -0,0 +1,80 @@
"""
MXC6655XA
CircuitPython library for the MXC6655XA Accelerometer
=======================================================
dahanzimin
20210411
mixly
"""
import time
import adafruit_bus_device.i2c_device as i2c_device
from micropython import const
MXC6655XA_ADDRESS = const(0x15)
MXC6655XA_REG_DATA = const(0x03)
MXC6655XA_REG_CTRL = const(0x0D)
MXC6655XA_REG_DEVICE_ID = const(0x0E)
MXC6655XA_CMD_8G_POWER_ON = const(0x40)
MXC6655XA_CMD_4G_POWER_ON = const(0x20)
MXC6655XA_CMD_2G_POWER_ON = const(0x00)
MXC6655XA_2G_SENSITIVITY =1024
MXC6655XA_4G_SENSITIVITY =512
MXC6655XA_8G_SENSITIVITY =256
MXC6655XA_T_ZERO =25
MXC6655XA_T_SENSITIVITY =0.586
class MXC6655XA:
_BUFFER = bytearray(2)
data_reg = bytearray(7)
def __init__(self, i2c_bus):
self._device = i2c_device.I2CDevice(i2c_bus, MXC6655XA_ADDRESS)
if self._chip_id() != 0x02:
raise AttributeError("Cannot find a MXC6655XA")
self._Enable() #star
time.sleep(0.3)
self.range = MXC6655XA_8G_SENSITIVITY #SET 8g range
def _read_u8(self, address):
# Read an 8-bit unsigned value from the specified 8-bit address.
with self._device:
self._BUFFER[0] = address & 0xFF
self._device.write(self._BUFFER, end=1)
self._device.readinto(self._BUFFER, end=1)
return self._BUFFER[0]
def _write_u8(self, address, val):
# Write an 8-bit unsigned value to the specified 8-bit address.
with self._device:
self._BUFFER[0] = address & 0xFF
self._BUFFER[1] = val & 0xFF
self._device.write(self._BUFFER, end=2)
def _chip_id(self):
return self._read_u8(MXC6655XA_REG_DEVICE_ID)
def _Enable(self):
self._write_u8(MXC6655XA_REG_CTRL,MXC6655XA_CMD_8G_POWER_ON)
def u2s(self,n):
return n if n < (1 << 7) else n - (1 << 8)
@property
def acceleration(self):
for i in range(0,6):
self.data_reg[i]=self._read_u8(MXC6655XA_REG_DATA+i)
x_acc=float((self.u2s(self.data_reg[0])<<8|self.data_reg[1])>>4)/self.range
y_acc=float((self.u2s(self.data_reg[2])<<8|self.data_reg[3])>>4)/self.range
z_acc=float((self.u2s(self.data_reg[4])<<8|self.data_reg[5])>>4)/self.range
#t_acc=float(self.u2s(self.data_reg[6]))*MXC6655XA_T_SENSITIVITY + MXC6655XA_T_ZERO
return (-y_acc,-x_acc,z_acc)
def get_temperature(self):
t_acc=float(self.u2s(self._read_u8(MXC6655XA_REG_DATA+6)))*MXC6655XA_T_SENSITIVITY + MXC6655XA_T_ZERO
return round(t_acc,1)

View File

@ -0,0 +1,71 @@
"""
RGB
CircuitPython library for RGB - MixGoCE
=======================================================
Small Cabbage
20210721
"""
import time
import board
import neopixel
def wheel(pos):
# Input a value 0 to 255 to get a color value.
# The colours are a transition r - g - b - back to r.
if pos < 0 or pos > 255:
return (0, 0, 0)
if pos < 85:
return (255 - pos * 3, pos * 3, 0)
if pos < 170:
pos -= 85
return (0, 255 - pos * 3, pos * 3)
pos -= 170
return (pos * 3, 0, 255 - pos * 3)
class RGB(object):
def __init__(self,pin=board.IO8,num=4,flag="RGB"):
self.pin = pin
self.num = num
self.pixels = neopixel.NeoPixel(pin, num, brightness=0.3, auto_write=False, pixel_order = flag)
def show_all(self, R, G, B):
color = (R, G, B)
self.pixels.fill(color)
def show_one(self, index, R, G, B):
color = (R, G, B)
self.pixels[index - 1] = color
def write(self):
self.pixels.show()
def color_chase(self, R, G, B, wait):
color = (R, G, B)
for i in range(self.num):
self.pixels[i] = color
time.sleep(wait/1000)
self.pixels.show()
def rainbow_cycle(self, wait):
for j in range(255):
for i in range(self.num):
rc_index = (i * 256 // self.num) + j
self.pixels[i] = wheel(rc_index & 255)
self.pixels.show()
time.sleep(wait/1000/256)
def change_mod(self,flag):
import time
if flag in ("RGB","GRB"):
self.pixels.deinit()
time.sleep(0.1)
self.pixels = neopixel.NeoPixel(self.pin, self.num, brightness=0.3, auto_write=False, pixel_order = flag)
rgb = RGB()
rgb.show_all(0,0,0)
rgb.write()

View File

@ -0,0 +1,55 @@
"""
Analog Sensor
CircuitPython library for Analog Sensor - MixGoCE
=======================================================
Small Cabbage
20210721
dahanzimin
20210423
"""
from mixgoce import version
class ADCSensor:
import board
__pins=[board.IO13,board.IO15,board.IO16]
__species = {}
__first_init = True
def __new__(cls, pin, *args, **kwargs):
if pin not in cls.__species.keys():
cls.__first_init = True
cls.__species[pin]=object.__new__(cls)
return cls.__species[pin]
def __init__(self,pin):
from analogio import AnalogIn
if self.__first_init:
self.__first_init = False
self.pin = AnalogIn(self.__pins[pin])
def read(self):
return self.pin.value
def get_brightness():
return ADCSensor(1).read()
def get_soundlevel(): #fix
value_d= []
for _ in range(5):
values = []
for _ in range(5):
val = ADCSensor(0).read()
values.append(val)
value_d.append(max(values) - min(values))
return max(value_d)
if version:#new
from mixgoce import acc
def get_temperature():
return acc.get_temperature()
else:#old
def get_temperature():
adc_val = ADCSensor(2).read()
return adc_val * 3.9 / 5900

View File

@ -0,0 +1,25 @@
"""
TouchPad
CircuitPython library for TouchPad - MixGoCE
=======================================================
Small Cabbage
20210721
dahanzimin
20210423
"""
import board
from touchio import TouchIn
class TouchPad():
def __init__(self,pin,v=18000):
self.pin = TouchIn(pin)
self.v = v
def is_touched(self):
return self.pin.raw_value > self.v
touch_T1 = TouchPad(board.IO4)
touch_T2 = TouchPad(board.IO5)
touch_T3 = TouchPad(board.IO6)
touch_T4 = TouchPad(board.IO7)

View File

@ -0,0 +1,32 @@
/*
* This file is part of the MicroPython project, http://micropython.org/
*
* The MIT License (MIT)
*
* Copyright (c) 2019 Scott Shawcroft for Adafruit Industries
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
// Micropython setup
#define MICROPY_HW_BOARD_NAME "MixGo CE"
#define MICROPY_HW_MCU_NAME "ESP32S2"
#define MICROPY_HW_NEOPIXEL (&pin_GPIO45)

View File

@ -0,0 +1,24 @@
USB_VID = 0x239A
USB_PID = 0x80A8
USB_PRODUCT = "MixGo CE"
USB_MANUFACTURER = "Espressif"
IDF_TARGET = esp32s2
INTERNAL_FLASH_FILESYSTEM = 1
LONGINT_IMPL = MPZ
# The default queue depth of 16 overflows on release builds,
# so increase it to 32.
CFLAGS += -DCFG_TUD_TASK_QUEUE_SZ=32
CIRCUITPY_ESP_FLASH_MODE=dio
CIRCUITPY_ESP_FLASH_FREQ=40m
CIRCUITPY_ESP_FLASH_SIZE=4MB
CIRCUITPY_MODULE=wroom
FROZEN_MPY_DIRS += $(TOP)/frozen/Adafruit_CircuitPython_Requests
FROZEN_MPY_DIRS += $(TOP)/frozen/Adafruit_CircuitPython_NeoPixel
FROZEN_MPY_DIRS += boards/$(BOARD)/mixgoce_lib

View File

@ -0,0 +1,50 @@
#include "shared-bindings/board/__init__.h"
STATIC const mp_rom_map_elem_t board_module_globals_table[] = {
CIRCUITPYTHON_BOARD_DICT_STANDARD_ITEMS
{ MP_ROM_QSTR(MP_QSTR_IO0), MP_ROM_PTR(&pin_GPIO0) },
{ MP_ROM_QSTR(MP_QSTR_IO1), MP_ROM_PTR(&pin_GPIO1) },
{ MP_ROM_QSTR(MP_QSTR_IO2), MP_ROM_PTR(&pin_GPIO2) },
{ MP_ROM_QSTR(MP_QSTR_IO3), MP_ROM_PTR(&pin_GPIO3) },
{ MP_ROM_QSTR(MP_QSTR_IO4), MP_ROM_PTR(&pin_GPIO4) },
{ MP_ROM_QSTR(MP_QSTR_IO5), MP_ROM_PTR(&pin_GPIO5) },
{ MP_ROM_QSTR(MP_QSTR_IO6), MP_ROM_PTR(&pin_GPIO6) },
{ MP_ROM_QSTR(MP_QSTR_IO7), MP_ROM_PTR(&pin_GPIO7) },
{ MP_ROM_QSTR(MP_QSTR_IO8), MP_ROM_PTR(&pin_GPIO8) },
{ MP_ROM_QSTR(MP_QSTR_IO9), MP_ROM_PTR(&pin_GPIO9) },
{ MP_ROM_QSTR(MP_QSTR_IO10), MP_ROM_PTR(&pin_GPIO10) },
{ MP_ROM_QSTR(MP_QSTR_IO11), MP_ROM_PTR(&pin_GPIO11) },
{ MP_ROM_QSTR(MP_QSTR_IO12), MP_ROM_PTR(&pin_GPIO12) },
{ MP_ROM_QSTR(MP_QSTR_IO13), MP_ROM_PTR(&pin_GPIO13) },
{ MP_ROM_QSTR(MP_QSTR_IO14), MP_ROM_PTR(&pin_GPIO14) },
{ MP_ROM_QSTR(MP_QSTR_IO15), MP_ROM_PTR(&pin_GPIO15) },
{ MP_ROM_QSTR(MP_QSTR_IO16), MP_ROM_PTR(&pin_GPIO16) },
{ MP_ROM_QSTR(MP_QSTR_IO17), MP_ROM_PTR(&pin_GPIO17) },
{ MP_ROM_QSTR(MP_QSTR_IO18), MP_ROM_PTR(&pin_GPIO18) },
{ MP_ROM_QSTR(MP_QSTR_IO19), MP_ROM_PTR(&pin_GPIO19) },
{ MP_ROM_QSTR(MP_QSTR_IO20), MP_ROM_PTR(&pin_GPIO20) },
{ MP_ROM_QSTR(MP_QSTR_IO21), MP_ROM_PTR(&pin_GPIO21) },
{ MP_ROM_QSTR(MP_QSTR_IO26), MP_ROM_PTR(&pin_GPIO26) },
{ MP_ROM_QSTR(MP_QSTR_IO33), MP_ROM_PTR(&pin_GPIO33) },
{ MP_ROM_QSTR(MP_QSTR_IO34), MP_ROM_PTR(&pin_GPIO34) },
{ MP_ROM_QSTR(MP_QSTR_IO35), MP_ROM_PTR(&pin_GPIO35) },
{ MP_ROM_QSTR(MP_QSTR_IO36), MP_ROM_PTR(&pin_GPIO36) },
{ MP_ROM_QSTR(MP_QSTR_IO37), MP_ROM_PTR(&pin_GPIO37) },
{ MP_ROM_QSTR(MP_QSTR_IO38), MP_ROM_PTR(&pin_GPIO38) },
{ MP_ROM_QSTR(MP_QSTR_IO39), MP_ROM_PTR(&pin_GPIO39) },
{ MP_ROM_QSTR(MP_QSTR_IO40), MP_ROM_PTR(&pin_GPIO40) },
{ MP_ROM_QSTR(MP_QSTR_IO41), MP_ROM_PTR(&pin_GPIO41) },
{ MP_ROM_QSTR(MP_QSTR_IO42), MP_ROM_PTR(&pin_GPIO42) },
{ MP_ROM_QSTR(MP_QSTR_TX), MP_ROM_PTR(&pin_GPIO43) },
{ MP_ROM_QSTR(MP_QSTR_IO43), MP_ROM_PTR(&pin_GPIO43) },
{ MP_ROM_QSTR(MP_QSTR_RX), MP_ROM_PTR(&pin_GPIO44) },
{ MP_ROM_QSTR(MP_QSTR_IO44), MP_ROM_PTR(&pin_GPIO44) },
{ MP_ROM_QSTR(MP_QSTR_IO45), MP_ROM_PTR(&pin_GPIO45) },
{ MP_ROM_QSTR(MP_QSTR_IO46), MP_ROM_PTR(&pin_GPIO46) },
{ MP_ROM_QSTR(MP_QSTR_NEOPIXEL), MP_ROM_PTR(&pin_GPIO45) },
};
MP_DEFINE_CONST_DICT(board_module_globals, board_module_globals_table);