diff --git a/.gitmodules b/.gitmodules index 9a10cc21e2..089162d33e 100644 --- a/.gitmodules +++ b/.gitmodules @@ -286,3 +286,6 @@ [submodule "frozen/Adafruit_CircuitPython_FakeRequests"] path = frozen/Adafruit_CircuitPython_FakeRequests url = https://github.com/adafruit/Adafruit_CircuitPython_FakeRequests.git +[submodule "ports/espressif/boards/mixgo_ce_udisk/cp_lib"] + path = ports/espressif/boards/mixgo_ce_udisk/cp_lib + url = https://github.com/dahanzimin/circuitpython_lib.git diff --git a/ports/espressif/boards/mixgo_ce_udisk/cp_lib b/ports/espressif/boards/mixgo_ce_udisk/cp_lib new file mode 160000 index 0000000000..d6bb0f58f6 --- /dev/null +++ b/ports/espressif/boards/mixgo_ce_udisk/cp_lib @@ -0,0 +1 @@ +Subproject commit d6bb0f58f62983f11e771c5ec91c934eb6ff2b82 diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/Image.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/Image.py deleted file mode 100644 index c799b4aadd..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/Image.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -Image - -CircuitPython library for Image - MixGoCE -======================================================= - -Small Cabbage -20210721 -""" - -HEART=bytearray(b'\x00\x00\x00\x0c\x1e\x3f\x7e\xfc\x7e\x3f\x1e\x0c\x00\x00\x00\x00') -HEART_SMALL=bytearray(b'\x00\x00\x00\x00\x0c\x1e\x3c\x78\x3c\x1e\x0c\x00\x00\x00\x00\x00') -HAPPY=bytearray(b'\x00\x00\x00\x0c\x0c\x20\x40\x80\x80\x40\x20\x0c\x0c\x00\x00\x00') -SAD=bytearray(b'\x00\x00\x08\x04\x04\x84\x40\x20\x20\x40\x84\x04\x04\x08\x00\x00') -SMILE=bytearray(b'\x00\x00\x04\x02\x02\x24\x40\x80\x80\x40\x24\x02\x02\x04\x00\x00') -SILLY=bytearray(b'\x00\x00\x04\x0a\x0a\x04\x60\xa0\xa0\x60\x04\x0a\x0a\x04\x00\x00') -FABULOUS=bytearray(b'\x00\x00\x06\x05\x05\x65\xa6\xa0\xa0\xa6\x65\x05\x05\x06\x00\x00') -SURPRISED=bytearray(b'\x00\x00\x00\x00\x00\x00\x00\xbe\x00\x00\x00\x00\x00\x00\x00\x00') -ASLEEP=bytearray(b'\x00\x00\x04\x04\x04\x44\xa0\xa0\xa0\xa0\x44\x04\x04\x04\x00\x00') -ANGRY=bytearray(b'\x00\x00\x01\x02\x84\x42\x21\x10\x10\x21\x42\x84\x02\x01\x00\x00') -CONFUSED=bytearray(b'\x00\x00\x00\x00\x00\x04\x02\xa2\x12\x0c\x00\x00\x00\x00\x00\x00') -NO=bytearray(b'\x00\x00\x00\x00\x42\x24\x18\x18\x24\x42\x00\x00\x00\x00\x00\x00') -YES=bytearray(b'\x00\x00\x00\x00\x10\x20\x40\x40\x20\x10\x08\x04\x02\x00\x00\x00') -LEFT_ARROW=bytearray(b'\x00\x00\x10\x28\x54\x92\x10\x10\x10\x10\x10\x10\x10\x10\x00\x00') -RIGHT_ARROW=bytearray(b'\x00\x00\x10\x10\x10\x10\x10\x10\x10\x10\x92\x54\x28\x10\x00\x00') -DRESS=bytearray(b'\x00\x00\x00\x40\xa2\xd7\xae\xda\xae\xd7\xa2\x40\x00\x00\x00\x00') -TRANSFORMERS=bytearray(b'\x00\x00\x00\x00\x00\x9c\x64\x1a\x64\x9c\x00\x00\x00\x00\x00\x00') -SCISSORS=bytearray(b'\x00\x00\x00\x00\x00\x43\xa6\x6c\x18\x6c\xa6\x43\x00\x00\x00\x00') -EXIT=bytearray(b'\x00\x00\x00\x00\x00\x00\x28\x04\x72\x0e\x17\x25\x48\x88\x00\x00') -TREE=bytearray(b'\x00\x00\x00\x10\x18\x1c\x1e\xff\x1e\x1c\x18\x10\x00\x00\x00\x00') -PACMAN=bytearray(b'\x00\x00\x1c\x36\x63\x41\x45\x41\x49\x55\x22\x00\x00\x00\x00\x00') -TARGET=bytearray(b'\x00\x00\x00\x00\x00\x00\x38\x44\x54\x44\x38\x00\x00\x00\x00\x00') -TSHIRT=bytearray(b'\x00\x00\x00\x04\x0a\xf9\x82\x82\x82\x82\xf9\x0a\x04\x00\x00\x00') -ROLLERSKATE=bytearray(b'\x00\x00\x00\x60\x5f\x71\x11\x17\x14\x14\x74\x58\x60\x00\x00\x00') -DUCK=bytearray(b'\x00\x00\x08\x0c\x0a\xf9\x81\x83\x9e\x90\x90\x90\x50\x30\x10\x00') -HOUSE=bytearray(b'\x04\x06\xfb\x01\x01\xf9\x09\x29\x09\x09\xf9\x01\x01\xfb\x06\x04') -TORTOISE=bytearray(b'\x00\x00\x00\x00\x00\x5e\x3c\x3f\x3f\x3c\x5e\x00\x00\x00\x00\x00') -BUTTERFLY=bytearray(b'\x00\x00\x00\x00\x04\xca\xaa\x5c\x38\x5c\xaa\xca\x04\x00\x00\x00') -STICKFIGURE=bytearray(b'\x00\x00\x00\x00\x00\x90\x4a\x3d\x4a\x90\x00\x00\x00\x00\x00\x00') -GHOST=bytearray(b'\x00\x00\x00\x00\xfe\xdf\xe9\xdf\xe9\xdf\xfe\xc0\x80\x00\x00\x00') -PITCHFORK=bytearray(b'\x08\x08\x08\x08\x08\x08\x08\x08\x08\x08\x3e\x49\x49\x49\x49\x49') -MUSIC_QUAVERS=bytearray(b'\x20\x10\x08\x0c\x1c\x38\x30\x10\x08\x0c\x1c\x38\x30\x10\x08\x04') -MUSIC_QUAVER=bytearray(b'\x06\x05\x05\x05\x05\x05\x05\x05\x36\x7c\xfc\xfc\xf8\x70\x00\x00') -MUSIC_CROTCHET=bytearray(b'\x02\x02\x02\x02\x02\x02\x02\x02\x1a\x3e\x7e\x7e\x7c\x38\x00\x00') -COW=bytearray(b'\x00\x1e\x1a\x1f\xfe\xfc\x1c\x1c\x1c\xfc\xfc\x08\x10\x10\x20\x00') -RABBIT=bytearray(b'\x14\x2a\x2a\x2a\x2a\x2a\x63\x41\x55\x41\x49\x49\x5d\x41\x3e\x00') -SQUARE_SMALL=bytearray(b'\x00\x00\x00\x00\x00\x00\x3c\x24\x24\x3c\x00\x00\x00\x00\x00\x00') -SQUARE=bytearray(b'\x00\x00\x00\x00\xff\x81\x81\x81\x81\x81\x81\xff\x00\x00\x00\x00') -DIAMOND_SMALL=bytearray(b'\x00\x00\x00\x00\x00\x08\x14\x2c\x14\x08\x00\x00\x00\x00\x00\x00') -DIAMOND=bytearray(b'\x00\x00\x04\x0e\x1b\x35\x6f\xdd\x6f\x35\x1b\x0e\x04\x00\x00\x00') -CHESSBOARD=bytearray(b'\x00\x00\x00\xfe\xaa\xfe\xaa\xfe\xaa\xfe\xaa\xfe\xaa\xfe\x00\x00') -TRIANGLE_LEFT=bytearray(b'\x00\x00\x00\x00\x00\x10\x38\x7c\xfe\x00\x00\x00\x00\x00\x00\x00') -TRIANGLE=bytearray(b'\x00\x00\x40\x60\x70\x78\x7c\x7e\x7e\x7c\x78\x70\x60\x40\x00\x00') -SNAKE=bytearray(b'\x00\x40\x60\x70\x38\x18\x18\x18\x18\x1f\x05\x07\x00\x00\x00\x00') -UMBRELLA=bytearray(b'\x00\x00\x00\x08\x0c\x0e\x0e\xff\x8e\xce\x0c\x08\x00\x00\x00\x00') -SKULL=bytearray(b'\x00\x00\x00\x0c\x1a\x79\x99\xd7\x99\x79\x1a\x0c\x00\x00\x00\x00') -GIRAFFE=bytearray(b'\x00\x00\x00\x00\xe0\x20\x20\xff\x03\x02\x00\x00\x00\x00\x00\x00') -SWORD=bytearray(b'\x00\x00\x18\x18\x7e\x3c\x18\x18\x18\x18\x18\x18\x18\x00\x00\x00') \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_framebuf.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_framebuf.py deleted file mode 100644 index 458dcc5410..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_framebuf.py +++ /dev/null @@ -1,549 +0,0 @@ -# The MIT License (MIT) -# -# Copyright (c) 2018 Kattni Rembor for Adafruit Industries -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -""" -`adafruit_framebuf` -==================================================== - -CircuitPython pure-python framebuf module, based on the micropython framebuf module. - -* Author(s): Melissa LeBlanc-Williams, Kattni Rembor, Tony DiCola, original file - created by Damien P. George - -Implementation Notes --------------------- - -**Hardware:** - -* `Adafruit SSD1306 OLED displays `_ - -**Software and Dependencies:** - -* Adafruit CircuitPython firmware for the supported boards: - https://github.com/adafruit/circuitpython/releases - -""" - -__version__ = "0.0.0-auto.0" -__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_framebuf.git" - -import os -import struct - -# Framebuf format constants: -MVLSB = 0 # Single bit displays (like SSD1306 OLED) -RGB565 = 1 # 16-bit color displays -GS4_HMSB = 2 # Unimplemented! -MHMSB = 3 # Single bit displays like the Sharp Memory -RGB888 = 4 # Neopixels and Dotstars - - -class MHMSBFormat: - """MHMSBFormat""" - - @staticmethod - def set_pixel(framebuf, x, y, color): - """Set a given pixel to a color.""" - index = (y * framebuf.stride + x) // 8 - offset = 7 - x & 0x07 - framebuf.buf[index] = (framebuf.buf[index] & ~(0x01 << offset)) | ( - (color != 0) << offset - ) - - @staticmethod - def get_pixel(framebuf, x, y): - """Get the color of a given pixel""" - index = (y * framebuf.stride + x) // 8 - offset = 7 - x & 0x07 - return (framebuf.buf[index] >> offset) & 0x01 - - @staticmethod - def fill(framebuf, color): - """completely fill/clear the buffer with a color""" - if color: - fill = 0xFF - else: - fill = 0x00 - for i in range(len(framebuf.buf)): - framebuf.buf[i] = fill - - @staticmethod - def fill_rect(framebuf, x, y, width, height, color): - """Draw a rectangle at the given location, size and color. The ``fill_rect`` method draws - both the outline and interior.""" - # pylint: disable=too-many-arguments - for _x in range(x, x + width): - offset = 7 - _x & 0x07 - for _y in range(y, y + height): - index = (_y * framebuf.stride + _x) // 8 - framebuf.buf[index] = (framebuf.buf[index] & ~(0x01 << offset)) | ( - (color != 0) << offset - ) - - -class MVLSBFormat: - """MVLSBFormat""" - - @staticmethod - def set_pixel(framebuf, x, y, color): - """Set a given pixel to a color.""" - index = (y >> 3) * framebuf.stride + x - offset = y & 0x07 - framebuf.buf[index] = (framebuf.buf[index] & ~(0x01 << offset)) | ( - (color != 0) << offset - ) - - @staticmethod - def get_pixel(framebuf, x, y): - """Get the color of a given pixel""" - index = (y >> 3) * framebuf.stride + x - offset = y & 0x07 - return (framebuf.buf[index] >> offset) & 0x01 - - @staticmethod - def fill(framebuf, color): - """completely fill/clear the buffer with a color""" - if color: - fill = 0xFF - else: - fill = 0x00 - for i in range(len(framebuf.buf)): - framebuf.buf[i] = fill - - @staticmethod - def fill_rect(framebuf, x, y, width, height, color): - """Draw a rectangle at the given location, size and color. The ``fill_rect`` method draws - both the outline and interior.""" - # pylint: disable=too-many-arguments - while height > 0: - index = (y >> 3) * framebuf.stride + x - offset = y & 0x07 - for w_w in range(width): - framebuf.buf[index + w_w] = ( - framebuf.buf[index + w_w] & ~(0x01 << offset) - ) | ((color != 0) << offset) - y += 1 - height -= 1 - - -class RGB888Format: - """RGB888Format""" - - @staticmethod - def set_pixel(framebuf, x, y, color): - """Set a given pixel to a color.""" - index = (y * framebuf.stride + x) * 3 - if isinstance(color, tuple): - framebuf.buf[index : index + 3] = bytes(color) - else: - framebuf.buf[index : index + 3] = bytes( - ((color >> 16) & 255, (color >> 8) & 255, color & 255) - ) - - @staticmethod - def get_pixel(framebuf, x, y): - """Get the color of a given pixel""" - index = (y * framebuf.stride + x) * 3 - return ( - (framebuf.buf[index] << 16) - | (framebuf.buf[index + 1] << 8) - | framebuf.buf[index + 2] - ) - - @staticmethod - def fill(framebuf, color): - """completely fill/clear the buffer with a color""" - fill = (color >> 16) & 255, (color >> 8) & 255, color & 255 - for i in range(0, len(framebuf.buf), 3): - framebuf.buf[i : i + 3] = bytes(fill) - - @staticmethod - def fill_rect(framebuf, x, y, width, height, color): - """Draw a rectangle at the given location, size and color. The ``fill_rect`` method draws - both the outline and interior.""" - # pylint: disable=too-many-arguments - fill = (color >> 16) & 255, (color >> 8) & 255, color & 255 - for _x in range(x, x + width): - for _y in range(y, y + height): - index = (_y * framebuf.stride + _x) * 3 - framebuf.buf[index : index + 3] = bytes(fill) - - -class FrameBuffer: - """FrameBuffer object. - - :param buf: An object with a buffer protocol which must be large enough to contain every - pixel defined by the width, height and format of the FrameBuffer. - :param width: The width of the FrameBuffer in pixel - :param height: The height of the FrameBuffer in pixel - :param buf_format: Specifies the type of pixel used in the FrameBuffer; permissible values - are listed under Constants below. These set the number of bits used to - encode a color value and the layout of these bits in ``buf``. Where a - color value c is passed to a method, c is a small integer with an encoding - that is dependent on the format of the FrameBuffer. - :param stride: The number of pixels between each horizontal line of pixels in the - FrameBuffer. This defaults to ``width`` but may need adjustments when - implementing a FrameBuffer within another larger FrameBuffer or screen. The - ``buf`` size must accommodate an increased step size. - - """ - - def __init__(self, buf, width, height, buf_format=MVLSB, stride=None): - # pylint: disable=too-many-arguments - self.buf = buf - self.width = width - self.height = height - self.stride = stride - self._font = None - if self.stride is None: - self.stride = width - if buf_format == MVLSB: - self.format = MVLSBFormat() - elif buf_format == MHMSB: - self.format = MHMSBFormat() - elif buf_format == RGB888: - self.format = RGB888Format() - else: - raise ValueError("invalid format") - self._rotation = 0 - - @property - def rotation(self): - """The rotation setting of the display, can be one of (0, 1, 2, 3)""" - return self._rotation - - @rotation.setter - def rotation(self, val): - if not val in (0, 1, 2, 3): - raise RuntimeError("Bad rotation setting") - self._rotation = val - - def fill(self, color): - """Fill the entire FrameBuffer with the specified color.""" - self.format.fill(self, color) - - def fill_rect(self, x, y, width, height, color): - """Draw a rectangle at the given location, size and color. The ``fill_rect`` method draws - both the outline and interior.""" - # pylint: disable=too-many-arguments, too-many-boolean-expressions - self.rect(x, y, width, height, color, fill=True) - - def pixel(self, x, y, color=None): - """If ``color`` is not given, get the color value of the specified pixel. If ``color`` is - given, set the specified pixel to the given color.""" - if self.rotation == 1: - x, y = y, x - x = self.width - x - 1 - if self.rotation == 2: - x = self.width - x - 1 - y = self.height - y - 1 - if self.rotation == 3: - x, y = y, x - y = self.height - y - 1 - - if x < 0 or x >= self.width or y < 0 or y >= self.height: - return None - if color is None: - return self.format.get_pixel(self, x, y) - self.format.set_pixel(self, x, y, color) - return None - - def hline(self, x, y, width, color): - """Draw a horizontal line up to a given length.""" - self.rect(x, y, width, 1, color, fill=True) - - def vline(self, x, y, height, color): - """Draw a vertical line up to a given length.""" - self.rect(x, y, 1, height, color, fill=True) - - def circle(self, center_x, center_y, radius, color): - """Draw a circle at the given midpoint location, radius and color. - The ```circle``` method draws only a 1 pixel outline.""" - x = radius - 1 - y = 0 - d_x = 1 - d_y = 1 - err = d_x - (radius << 1) - while x >= y: - self.pixel(center_x + x, center_y + y, color) - self.pixel(center_x + y, center_y + x, color) - self.pixel(center_x - y, center_y + x, color) - self.pixel(center_x - x, center_y + y, color) - self.pixel(center_x - x, center_y - y, color) - self.pixel(center_x - y, center_y - x, color) - self.pixel(center_x + y, center_y - x, color) - self.pixel(center_x + x, center_y - y, color) - if err <= 0: - y += 1 - err += d_y - d_y += 2 - if err > 0: - x -= 1 - d_x += 2 - err += d_x - (radius << 1) - - def rect(self, x, y, width, height, color, *, fill=False): - """Draw a rectangle at the given location, size and color. The ```rect``` method draws only - a 1 pixel outline.""" - # pylint: disable=too-many-arguments - if self.rotation == 1: - x, y = y, x - width, height = height, width - x = self.width - x - width - if self.rotation == 2: - x = self.width - x - width - y = self.height - y - height - if self.rotation == 3: - x, y = y, x - width, height = height, width - y = self.height - y - height - - # pylint: disable=too-many-boolean-expressions - if ( - width < 1 - or height < 1 - or (x + width) <= 0 - or (y + height) <= 0 - or y >= self.height - or x >= self.width - ): - return - x_end = min(self.width - 1, x + width - 1) - y_end = min(self.height - 1, y + height - 1) - x = max(x, 0) - y = max(y, 0) - if fill: - self.format.fill_rect(self, x, y, x_end - x + 1, y_end - y + 1, color) - else: - self.format.fill_rect(self, x, y, x_end - x + 1, 1, color) - self.format.fill_rect(self, x, y, 1, y_end - y + 1, color) - self.format.fill_rect(self, x, y_end, x_end - x + 1, 1, color) - self.format.fill_rect(self, x_end, y, 1, y_end - y + 1, color) - - def line(self, x_0, y_0, x_1, y_1, color): - # pylint: disable=too-many-arguments - """Bresenham's line algorithm""" - d_x = abs(x_1 - x_0) - d_y = abs(y_1 - y_0) - x, y = x_0, y_0 - s_x = -1 if x_0 > x_1 else 1 - s_y = -1 if y_0 > y_1 else 1 - if d_x > d_y: - err = d_x / 2.0 - while x != x_1: - self.pixel(x, y, color) - err -= d_y - if err < 0: - y += s_y - err += d_x - x += s_x - else: - err = d_y / 2.0 - while y != y_1: - self.pixel(x, y, color) - err -= d_x - if err < 0: - x += s_x - err += d_y - y += s_y - self.pixel(x, y, color) - - def blit(self): - """blit is not yet implemented""" - raise NotImplementedError() - - def scroll(self, delta_x, delta_y): - """shifts framebuf in x and y direction""" - if delta_x < 0: - shift_x = 0 - xend = self.width + delta_x - dt_x = 1 - else: - shift_x = self.width - 1 - xend = delta_x - 1 - dt_x = -1 - if delta_y < 0: - y = 0 - yend = self.height + delta_y - dt_y = 1 - else: - y = self.height - 1 - yend = delta_y - 1 - dt_y = -1 - while y != yend: - x = shift_x - while x != xend: - self.format.set_pixel( - self, x, y, self.format.get_pixel(self, x - delta_x, y - delta_y) - ) - x += dt_x - y += dt_y - - # pylint: disable=too-many-arguments - def text(self, string, x, y, color, *, font_name="font5x8.bin", size=1): - """Place text on the screen in variables sizes. Breaks on \n to next line. - - Does not break on line going off screen. - """ - # determine our effective width/height, taking rotation into account - frame_width = self.width - frame_height = self.height - if self.rotation == 1 or self.rotation == 3: - frame_width, frame_height = frame_height, frame_width - - for chunk in string.split("\n"): - if not self._font or self._font.font_name != font_name: - # load the font! - self._font = BitmapFont(font_name) - width = self._font.font_width - height = self._font.font_height - for i, char in enumerate(chunk): - char_x = x + (i * (width + 1)) * size - if ( - char_x + (width * size) > 0 - and char_x < frame_width - and y + (height * size) > 0 - and y < frame_height - ): - self._font.draw_char(char, char_x, y, self, color, size=size) - y += height * size - - # pylint: enable=too-many-arguments - - def image(self, img): - """Set buffer to value of Python Imaging Library image. The image should - be in 1 bit mode and a size equal to the display size.""" - # determine our effective width/height, taking rotation into account - width = self.width - height = self.height - if self.rotation == 1 or self.rotation == 3: - width, height = height, width - - if isinstance(self.format, RGB888Format) and img.mode != "RGB": - raise ValueError("Image must be in mode RGB.") - if isinstance(self.format, (MHMSBFormat, MVLSBFormat)) and img.mode != "1": - raise ValueError("Image must be in mode 1.") - - imwidth, imheight = img.size - if imwidth != width or imheight != height: - raise ValueError( - "Image must be same dimensions as display ({0}x{1}).".format( - width, height - ) - ) - # Grab all the pixels from the image, faster than getpixel. - pixels = img.load() - # Clear buffer - for i in range(len(self.buf)): - self.buf[i] = 0 - # Iterate through the pixels - for x in range(width): # yes this double loop is slow, - for y in range(height): # but these displays are small! - if img.mode == "RGB": - self.pixel(x, y, pixels[(x, y)]) - elif pixels[(x, y)]: - self.pixel(x, y, 1) # only write if pixel is true - - -# MicroPython basic bitmap font renderer. -# Author: Tony DiCola -# License: MIT License (https://opensource.org/licenses/MIT) -class BitmapFont: - """A helper class to read binary font tiles and 'seek' through them as a - file to display in a framebuffer. We use file access so we dont waste 1KB - of RAM on a font!""" - - def __init__(self, font_name="font5x8.bin"): - # Specify the drawing area width and height, and the pixel function to - # call when drawing pixels (should take an x and y param at least). - # Optionally specify font_name to override the font file to use (default - # is font5x8.bin). The font format is a binary file with the following - # format: - # - 1 unsigned byte: font character width in pixels - # - 1 unsigned byte: font character height in pixels - # - x bytes: font data, in ASCII order covering all 255 characters. - # Each character should have a byte for each pixel column of - # data (i.e. a 5x8 font has 5 bytes per character). - self.font_name = font_name - - # Open the font file and grab the character width and height values. - # Note that only fonts up to 8 pixels tall are currently supported. - try: - self._font_code=b'\x05\x08\x00\x00\x00\x00\x00>[O[>>kOk>\x1c>|>\x1c\x18<~<\x18\x1cW}W\x1c\x1c^\x7f^\x1c\x00\x18<\x18\x00\xff\xe7\xc3\xe7\xff\x00\x18$\x18\x00\xff\xe7\xdb\xe7\xff0H:\x06\x0e&)y)&@\x7f\x05\x05\x07@\x7f\x05%?Z<\xe7\x1c\x1c\x08\x08\x1c\x1c>\x7f\x14"\x7f"\x14__\x00__\x06\t\x7f\x01\x7f\x00f\x89\x95j`````\x94\xa2\xff\xa2\x94\x08\x04~\x04\x08\x10 ~ \x10\x08\x08*\x1c\x08\x08\x1c*\x08\x08\x1e\x10\x10\x10\x10\x0c\x1e\x0c\x1e\x0c08>80\x06\x0e>\x0e\x06\x00\x00\x00\x00\x00\x00\x00_\x00\x00\x00\x07\x00\x07\x00\x14\x7f\x14\x7f\x14$*\x7f*\x12#\x13\x08db6IV P\x00\x08\x07\x03\x00\x00\x1c"A\x00\x00A"\x1c\x00*\x1c\x7f\x1c*\x08\x08>\x08\x08\x00\x80p0\x00\x08\x08\x08\x08\x08\x00\x00``\x00 \x10\x08\x04\x02>QIE>\x00B\x7f@\x00rIIIF!AIM3\x18\x14\x12\x7f\x10\'EEE9A]YN|\x12\x11\x12|\x7fIII6>AAA"\x7fAAA>\x7fIIIA\x7f\t\t\t\x01>AAQs\x7f\x08\x08\x08\x7f\x00A\x7fA\x00 @A?\x01\x7f\x08\x14"A\x7f@@@@\x7f\x02\x1c\x02\x7f\x7f\x04\x08\x10\x7f>AAA>\x7f\t\t\t\x06>AQ!^\x7f\t\x19)F&III2\x03\x01\x7f\x01\x03?@@@?\x1f @ \x1f?@8@?c\x14\x08\x14c\x03\x04x\x04\x03aYIMC\x00\x7fAAA\x02\x04\x08\x10 \x00AAA\x7f\x04\x02\x01\x02\x04@@@@@\x00\x03\x07\x08\x00 TTx@\x7f(DD88DDD(8DD(\x7f8TTT\x18\x00\x08~\t\x02\x18\xa4\xa4\x9cx\x7f\x08\x04\x04x\x00D}@\x00 @@=\x00\x7f\x10(D\x00\x00A\x7f@\x00|\x04x\x04x|\x08\x04\x04x8DDD8\xfc\x18$$\x18\x18$$\x18\xfc|\x08\x04\x04\x08HTTT$\x04\x04?D$<@@ |\x1c @ \x1c<@0@III\x00~\x01\x01\x01~*****DD_DD@QJD@@DJQ@\x00\x00\xff\x01\x03\xe0\x80\xff\x00\x00\x08\x08kk\x086\x126$6\x06\x0f\t\x0f\x06\x00\x00\x18\x18\x00\x00\x00\x10\x10\x000@\xff\x01\x01\x00\x1f\x01\x01\x1e\x00\x19\x1d\x17\x12\x00<<<<\x00\x00\x00\x00\x00' - self.font_width, self.font_height = struct.unpack("BB", b'\x05\x08') - # simple font file validation check based on expected file size - #if 2 + 256 * self.font_width != os.stat(font_name)[6]: - # raise RuntimeError("Invalid font file: " + font_name) - except OSError: - print("Could not find font file", font_name) - raise - except OverflowError: - # os.stat can throw this on boards without long int support - # just hope the font file is valid and press on - pass - - def deinit(self): - """Close the font file as cleanup.""" - #self._font.close() - - def __enter__(self): - """Initialize/open the font file""" - self.__init__() - return self - - def __exit__(self, exception_type, exception_value, traceback): - """cleanup on exit""" - #self.deinit() - pass - - def draw_char( - self, char, x, y, framebuffer, color, size=1 - ): # pylint: disable=too-many-arguments - """Draw one character at position (x,y) to a framebuffer in a given color""" - size = max(size, 1) - # Don't draw the character if it will be clipped off the visible area. - # if x < -self.font_width or x >= framebuffer.width or \ - # y < -self.font_height or y >= framebuffer.height: - # return - # Go through each column of the character. - for char_x in range(self.font_width): - # Grab the byte for the current column of font data. - #self._font.seek(2 + (ord(char) * self.font_width) + char_x) - chcode=bytes([self._font_code[(2 + (ord(char) * self.font_width) + char_x)]]) - try: - line = struct.unpack("B", chcode)[0] - except RuntimeError: - continue # maybe character isnt there? go to next - # Go through each row in the column byte. - for char_y in range(self.font_height): - # Draw a pixel for each bit that's flipped on. - if (line >> char_y) & 0x1: - framebuffer.fill_rect( - x + char_x * size, y + char_y * size, size, size, color - ) - - def width(self, text): - """Return the pixel width of the specified text message.""" - return len(text) * (self.font_width + 1) - - -class FrameBuffer1(FrameBuffer): # pylint: disable=abstract-method - """FrameBuffer1 object. Inherits from FrameBuffer.""" diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_irremote.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_irremote.py deleted file mode 100644 index e7f1476cfc..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_irremote.py +++ /dev/null @@ -1,283 +0,0 @@ -# The MIT License (MIT) -# -# Copyright (c) 2017 Scott Shawcroft for Adafruit Industries -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -""" -`adafruit_irremote` -==================================================== - -Demo code for Circuit Playground Express: - -.. code-block:: python - - # Circuit Playground Express Demo Code - # Adjust the pulseio 'board.PIN' if using something else - import pulseio - import board - import adafruit_irremote - - pulsein = pulseio.PulseIn(board.REMOTEIN, maxlen=120, idle_state=True) - decoder = adafruit_irremote.GenericDecode() - - - while True: - pulses = decoder.read_pulses(pulsein) - print("Heard", len(pulses), "Pulses:", pulses) - try: - code = decoder.decode_bits(pulses) - print("Decoded:", code) - except adafruit_irremote.IRNECRepeatException: # unusual short code! - print("NEC repeat!") - except adafruit_irremote.IRDecodeException as e: # failed to decode - print("Failed to decode: ", e.args) - - print("----------------------------") - -* Author(s): Scott Shawcroft - -Implementation Notes --------------------- - -**Hardware:** - -* `CircuitPlayground Express `_ - -* `IR Receiver Sensor `_ - -**Software and Dependencies:** - -* Adafruit CircuitPython firmware for the ESP8622 and M0-based boards: - https://github.com/adafruit/circuitpython/releases - -""" - -# Pretend self matter because we may add object level config later. -# pylint: disable=no-self-use - -import array -import time - -__version__ = "0.0.0-auto.0" -__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_IRRemote.git" - - -class IRDecodeException(Exception): - """Generic decode exception""" - - -class IRNECRepeatException(Exception): - """Exception when a NEC repeat is decoded""" - - -class GenericDecode: - """Generic decoding of infrared signals""" - - def bin_data(self, pulses): - """Compute bins of pulse lengths where pulses are +-25% of the average. - - :param list pulses: Input pulse lengths - """ - bins = [[pulses[0], 0]] - - for _, pulse in enumerate(pulses): - matchedbin = False - # print(pulse, end=": ") - for b, pulse_bin in enumerate(bins): - if pulse_bin[0] * 0.75 <= pulse <= pulse_bin[0] * 1.25: - # print("matches bin") - bins[b][0] = (pulse_bin[0] + pulse) // 2 # avg em - bins[b][1] += 1 # track it - matchedbin = True - break - if not matchedbin: - bins.append([pulse, 1]) - # print(bins) - return bins - - def decode_bits(self, pulses): - """Decode the pulses into bits.""" - # pylint: disable=too-many-branches,too-many-statements - - # special exception for NEC repeat code! - if ( - (len(pulses) == 3) - and (8000 <= pulses[0] <= 10000) - and (2000 <= pulses[1] <= 3000) - and (450 <= pulses[2] <= 700) - ): - raise IRNECRepeatException() - - if len(pulses) < 10: - raise IRDecodeException("10 pulses minimum") - - # Ignore any header (evens start at 1), and any trailer. - if len(pulses) % 2 == 0: - pulses_end = -1 - else: - pulses_end = None - - evens = pulses[1:pulses_end:2] - odds = pulses[2:pulses_end:2] - - # bin both halves - even_bins = self.bin_data(evens) - odd_bins = self.bin_data(odds) - - outliers = [b[0] for b in (even_bins + odd_bins) if b[1] == 1] - even_bins = [b for b in even_bins if b[1] > 1] - odd_bins = [b for b in odd_bins if b[1] > 1] - - if not even_bins or not odd_bins: - raise IRDecodeException("Not enough data") - - if len(even_bins) == 1: - pulses = odds - pulse_bins = odd_bins - elif len(odd_bins) == 1: - pulses = evens - pulse_bins = even_bins - else: - raise IRDecodeException("Both even/odd pulses differ") - - if len(pulse_bins) == 1: - raise IRDecodeException("Pulses do not differ") - if len(pulse_bins) > 2: - raise IRDecodeException("Only mark & space handled") - - mark = min(pulse_bins[0][0], pulse_bins[1][0]) - space = max(pulse_bins[0][0], pulse_bins[1][0]) - - if outliers: - # skip outliers - pulses = [ - p - for p in pulses - if not (outliers[0] * 0.75) <= p <= (outliers[0] * 1.25) - ] - # convert marks/spaces to 0 and 1 - for i, pulse_length in enumerate(pulses): - if (space * 0.75) <= pulse_length <= (space * 1.25): - pulses[i] = False - elif (mark * 0.75) <= pulse_length <= (mark * 1.25): - pulses[i] = True - else: - raise IRDecodeException("Pulses outside mark/space") - - # convert bits to bytes! - output = [0] * ((len(pulses) + 7) // 8) - for i, pulse_length in enumerate(pulses): - output[i // 8] = output[i // 8] << 1 - if pulse_length: - output[i // 8] |= 1 - return output - - def _read_pulses_non_blocking( - self, input_pulses, max_pulse=10000, pulse_window=0.10 - ): - """Read out a burst of pulses without blocking until pulses stop for a specified - period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``. - - :param ~pulseio.PulseIn input_pulses: Object to read pulses from - :param int max_pulse: Pulse duration to end a burst - :param float pulse_window: pulses are collected for this period of time - """ - received = None - recent_count = 0 - pruning = False - while True: - while input_pulses: - pulse = input_pulses.popleft() - recent_count += 1 - if pulse > max_pulse: - if received is None: - continue - pruning = True - if not pruning: - if received is None: - received = [] - received.append(pulse) - - if recent_count == 0: - return received - recent_count = 0 - time.sleep(pulse_window) - - def read_pulses( - self, - input_pulses, - *, - max_pulse=10000, - blocking=True, - pulse_window=0.10, - blocking_delay=0.10 - ): - """Read out a burst of pulses until pulses stop for a specified - period (pulse_window), pruning pulses after a pulse longer than ``max_pulse``. - - :param ~pulseio.PulseIn input_pulses: Object to read pulses from - :param int max_pulse: Pulse duration to end a burst - :param bool blocking: If True, will block until pulses found. - If False, will return None if no pulses. - Defaults to True for backwards compatibility - :param float pulse_window: pulses are collected for this period of time - :param float blocking_delay: delay between pulse checks when blocking - """ - while True: - pulses = self._read_pulses_non_blocking( - input_pulses, max_pulse, pulse_window - ) - if blocking and pulses is None: - time.sleep(blocking_delay) - continue - return pulses - - -class GenericTransmit: - """Generic infrared transmit class that handles encoding.""" - - def __init__(self, header, one, zero, trail): - self.header = header - self.one = one - self.zero = zero - self.trail = trail - - def transmit(self, pulseout, data): - """Transmit the ``data`` using the ``pulseout``. - - :param pulseio.PulseOut pulseout: PulseOut to transmit on - :param bytearray data: Data to transmit - """ - durations = array.array("H", [0] * (2 + len(data) * 8 * 2 + 1)) - durations[0] = self.header[0] - durations[1] = self.header[1] - durations[-1] = self.trail - out = 2 - for byte_index, _ in enumerate(data): - for i in range(7, -1, -1): - if (data[byte_index] & 1 << i) > 0: - durations[out] = self.one[0] - durations[out + 1] = self.one[1] - else: - durations[out] = self.zero[0] - durations[out + 1] = self.zero[1] - out += 2 - - # print(durations) - pulseout.send(durations) diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_minimqtt.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_minimqtt.py deleted file mode 100644 index 4dde03eb5d..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_minimqtt.py +++ /dev/null @@ -1,1000 +0,0 @@ -# SPDX-FileCopyrightText: 2019-2021 Brent Rubell for Adafruit Industries -# -# SPDX-License-Identifier: MIT - -# Original Work Copyright (c) 2016 Paul Sokolovsky, uMQTT -# Modified Work Copyright (c) 2019 Bradley Beach, esp32spi_mqtt -# Modified Work Copyright (c) 2012-2019 Roger Light and others, Paho MQTT Python - -""" -`adafruit_minimqtt` -================================================================================ - -A minimal MQTT Library for CircuitPython. - -* Author(s): Brent Rubell - -Implementation Notes --------------------- - -Adapted from https://github.com/micropython/micropython-lib/tree/master/umqtt.simple/umqtt - -**Software and Dependencies:** - -* Adafruit CircuitPython firmware for the supported boards: - https://github.com/adafruit/circuitpython/releases - -""" -import errno -import struct -import time -from random import randint -from micropython import const -from matcher import MQTTMatcher - -__version__ = "0.0.0-auto.0" -__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_MiniMQTT.git" - -# Client-specific variables -MQTT_MSG_MAX_SZ = const(268435455) -MQTT_MSG_SZ_LIM = const(10000000) -MQTT_TOPIC_LENGTH_LIMIT = const(65535) -MQTT_TCP_PORT = const(1883) -MQTT_TLS_PORT = const(8883) - -# MQTT Commands -MQTT_PINGREQ = b"\xc0\0" -MQTT_PINGRESP = const(0xD0) -MQTT_SUB = b"\x82" -MQTT_UNSUB = b"\xA2" -MQTT_DISCONNECT = b"\xe0\0" - -# Variable CONNECT header [MQTT 3.1.2] -MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0") - - -CONNACK_ERRORS = { - const(0x01): "Connection Refused - Incorrect Protocol Version", - const(0x02): "Connection Refused - ID Rejected", - const(0x03): "Connection Refused - Server unavailable", - const(0x04): "Connection Refused - Incorrect username/password", - const(0x05): "Connection Refused - Unauthorized", -} - -_default_sock = None # pylint: disable=invalid-name -_fake_context = None # pylint: disable=invalid-name - - -class MMQTTException(Exception): - """MiniMQTT Exception class.""" - - # pylint: disable=unnecessary-pass - # pass - - -# Legacy ESP32SPI Socket API -def set_socket(sock, iface=None): - """Legacy API for setting the socket and network interface. - - :param sock: socket object. - :param iface: internet interface object - - """ - global _default_sock # pylint: disable=invalid-name, global-statement - global _fake_context # pylint: disable=invalid-name, global-statement - _default_sock = sock - if iface: - _default_sock.set_interface(iface) - _fake_context = _FakeSSLContext(iface) - - -class _FakeSSLSocket: - def __init__(self, socket, tls_mode): - self._socket = socket - self._mode = tls_mode - self.settimeout = socket.settimeout - self.send = socket.send - self.recv = socket.recv - self.close = socket.close - - def connect(self, address): - """connect wrapper to add non-standard mode parameter""" - try: - return self._socket.connect(address, self._mode) - except RuntimeError as error: - raise OSError(errno.ENOMEM) from error - - -class _FakeSSLContext: - def __init__(self, iface): - self._iface = iface - - def wrap_socket(self, socket, server_hostname=None): - """Return the same socket""" - # pylint: disable=unused-argument - return _FakeSSLSocket(socket, self._iface.TLS_MODE) - - -class MQTT: - """MQTT Client for CircuitPython. - - :param str broker: MQTT Broker URL or IP Address. - :param int port: Optional port definition, defaults to 8883. - :param str username: Username for broker authentication. - :param str password: Password for broker authentication. - :param network_manager: NetworkManager object, such as WiFiManager from ESPSPI_WiFiManager. - :param str client_id: Optional client identifier, defaults to a unique, generated string. - :param bool is_ssl: Sets a secure or insecure connection with the broker. - :param int keep_alive: KeepAlive interval between the broker and the MiniMQTT client. - :param socket socket_pool: A pool of socket resources available for the given radio. - :param ssl_context: SSL context for long-lived SSL connections. - :param bool use_binary_mode: Messages are passed as bytearray instead of string to callbacks. - - """ - - # pylint: disable=too-many-arguments,too-many-instance-attributes, not-callable, invalid-name, no-member - def __init__( - self, - broker, - port=None, - username=None, - password=None, - client_id=None, - is_ssl=True, - keep_alive=60, - socket_pool=None, - ssl_context=None, - use_binary_mode=False, - ): - - self._socket_pool = socket_pool - self._ssl_context = ssl_context - self._sock = None - self._backwards_compatible_sock = False - self._use_binary_mode = use_binary_mode - - self.keep_alive = keep_alive - self._user_data = None - self._is_connected = False - self._msg_size_lim = MQTT_MSG_SZ_LIM - self._pid = 0 - self._timestamp = 0 - self.logger = None - - self.broker = broker - self._username = username - self._password = password - if ( - self._password and len(password.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT - ): # [MQTT-3.1.3.5] - raise MMQTTException("Password length is too large.") - - self.port = MQTT_TCP_PORT - if is_ssl: - self.port = MQTT_TLS_PORT - if port: - self.port = port - - # define client identifer - if client_id: - # user-defined client_id MAY allow client_id's > 23 bytes or - # non-alpha-numeric characters - self.client_id = client_id - else: - # assign a unique client_id - self.client_id = "cpy{0}{1}".format( - randint(0, int(time.monotonic() * 100) % 1000), randint(0, 99) - ) - # generated client_id's enforce spec.'s length rules - if len(self.client_id.encode("utf-8")) > 23 or not self.client_id: - raise ValueError("MQTT Client ID must be between 1 and 23 bytes") - - # LWT - self._lw_topic = None - self._lw_qos = 0 - self._lw_topic = None - self._lw_msg = None - self._lw_retain = False - - # List of subscribed topics, used for tracking - self._subscribed_topics = [] - self._on_message_filtered = MQTTMatcher() - - # Default topic callback methods - self._on_message = None - self.on_connect = None - self.on_disconnect = None - self.on_publish = None - self.on_subscribe = None - self.on_unsubscribe = None - - # pylint: disable=too-many-branches - def _get_connect_socket(self, host, port, *, timeout=1): - """Obtains a new socket and connects to a broker. - - :param str host: Desired broker hostname - :param int port: Desired broker port - :param int timeout: Desired socket timeout - """ - # For reconnections - check if we're using a socket already and close it - if self._sock: - self._sock.close() - self._sock = None - - # Legacy API - use the interface's socket instead of a passed socket pool - if self._socket_pool is None: - self._socket_pool = _default_sock - - # Legacy API - fake the ssl context - if self._ssl_context is None: - self._ssl_context = _fake_context - - if not isinstance(port, int): - raise RuntimeError("Port must be an integer") - - if port == 8883 and not self._ssl_context: - raise RuntimeError( - "ssl_context must be set before using adafruit_mqtt for secure MQTT." - ) - - if self.logger is not None and port == MQTT_TLS_PORT: - self.logger.info( - "Establishing a SECURE SSL connection to {0}:{1}".format(host, port) - ) - elif self.logger is not None: - self.logger.info( - "Establishing an INSECURE connection to {0}:{1}".format(host, port) - ) - - addr_info = self._socket_pool.getaddrinfo( - host, port, 0, self._socket_pool.SOCK_STREAM - )[0] - - sock = None - retry_count = 0 - while retry_count < 5 and sock is None: - retry_count += 1 - - try: - sock = self._socket_pool.socket(addr_info[0], addr_info[1]) - except OSError: - continue - - connect_host = addr_info[-1][0] - if port == 8883: - sock = self._ssl_context.wrap_socket(sock, server_hostname=host) - connect_host = host - sock.settimeout(timeout) - - try: - sock.connect((connect_host, port)) - except MemoryError: - sock.close() - sock = None - except OSError: - sock.close() - sock = None - - if sock is None: - raise RuntimeError("Repeated socket failures") - - self._backwards_compatible_sock = not hasattr(sock, "recv_into") - return sock - - def __enter__(self): - return self - - def __exit__(self, exception_type, exception_value, traceback): - self.deinit() - - def _sock_exact_recv(self, bufsize): - """Reads _exact_ number of bytes from the connected socket. Will only return - string with the exact number of bytes requested. - - The semantics of native socket receive is that it returns no more than the - specified number of bytes (i.e. max size). However, it makes no guarantees in - terms of the minimum size of the buffer, which could be 1 byte. This is a - wrapper for socket recv() to ensure that no less than the expected number of - bytes is returned or trigger a timeout exception. - - :param int bufsize: number of bytes to receive - """ - stamp = time.monotonic() - rc = self._sock.recv(bufsize) - to_read = bufsize - len(rc) - assert to_read >= 0 - read_timeout = self.keep_alive - while to_read > 0: - recv = self._sock.recv(to_read) - to_read -= len(recv) - rc += recv - if time.monotonic() - stamp > read_timeout: - raise MMQTTException( - "Unable to receive {} bytes within {} seconds.".format( - to_read, read_timeout - ) - ) - return rc - - def deinit(self): - """De-initializes the MQTT client and disconnects from the mqtt broker.""" - self.disconnect() - - @property - def mqtt_msg(self): - """Returns maximum MQTT payload and topic size.""" - return self._msg_size_lim, MQTT_TOPIC_LENGTH_LIMIT - - @mqtt_msg.setter - def mqtt_msg(self, msg_size): - """Sets the maximum MQTT message payload size. - - :param int msg_size: Maximum MQTT payload size. - """ - if msg_size < MQTT_MSG_MAX_SZ: - self._msg_size_lim = msg_size - - def will_set(self, topic=None, payload=None, qos=0, retain=False): - """Sets the last will and testament properties. MUST be called before `connect()`. - - :param str topic: MQTT Broker topic. - :param int|float|str payload: Last will disconnection payload. - payloads of type int & float are converted to a string. - :param int qos: Quality of Service level, defaults to - zero. Conventional options are ``0`` (send at most once), ``1`` - (send at least once), or ``2`` (send exactly once). - - .. note:: Only options ``1`` or ``0`` are QoS levels supported by this library. - :param bool retain: Specifies if the payload is to be retained when - it is published. - """ - if self.logger is not None: - self.logger.debug("Setting last will properties") - self._valid_qos(qos) - if self._is_connected: - raise MMQTTException("Last Will should only be called before connect().") - if payload is None: - payload = "" - if isinstance(payload, (int, float, str)): - payload = str(payload).encode() - else: - raise MMQTTException("Invalid message data type.") - self._lw_qos = qos - self._lw_topic = topic - self._lw_msg = payload - self._lw_retain = retain - - def add_topic_callback(self, mqtt_topic, callback_method): - """Registers a callback_method for a specific MQTT topic. - - :param str mqtt_topic: MQTT topic identifier. - :param function callback_method: The callback method. - """ - if mqtt_topic is None or callback_method is None: - raise ValueError("MQTT topic and callback method must both be defined.") - self._on_message_filtered[mqtt_topic] = callback_method - - def remove_topic_callback(self, mqtt_topic): - """Removes a registered callback method. - - :param str mqtt_topic: MQTT topic identifier string. - """ - if mqtt_topic is None: - raise ValueError("MQTT Topic must be defined.") - try: - del self._on_message_filtered[mqtt_topic] - except KeyError: - raise KeyError( - "MQTT topic callback not added with add_topic_callback." - ) from None - - @property - def on_message(self): - """Called when a new message has been received on a subscribed topic. - - Expected method signature is ``on_message(client, topic, message)`` - """ - return self._on_message - - @on_message.setter - def on_message(self, method): - self._on_message = method - - def _handle_on_message(self, client, topic, message): - matched = False - if topic is not None: - for callback in self._on_message_filtered.iter_match(topic): - callback(client, topic, message) # on_msg with callback - matched = True - - if not matched and self.on_message: # regular on_message - self.on_message(client, topic, message) - - def username_pw_set(self, username, password=None): - """Set client's username and an optional password. - - :param str username: Username to use with your MQTT broker. - :param str password: Password to use with your MQTT broker. - - """ - if self._is_connected: - raise MMQTTException("This method must be called before connect().") - self._username = username - if password is not None: - self._password = password - - # pylint: disable=too-many-branches, too-many-statements, too-many-locals - def connect(self, clean_session=True, host=None, port=None, keep_alive=None): - """Initiates connection with the MQTT Broker. - - :param bool clean_session: Establishes a persistent session. - :param str host: Hostname or IP address of the remote broker. - :param int port: Network port of the remote broker. - :param int keep_alive: Maximum period allowed for communication, in seconds. - - """ - if host: - self.broker = host - if port: - self.port = port - if keep_alive: - self.keep_alive = keep_alive - - if self.logger is not None: - self.logger.debug("Attempting to establish MQTT connection...") - - # Get a new socket - self._sock = self._get_connect_socket(self.broker, self.port) - - # Fixed Header - fixed_header = bytearray([0x10]) - - # NOTE: Variable header is - # MQTT_HDR_CONNECT = bytearray(b"\x04MQTT\x04\x02\0\0") - # because final 4 bytes are 4, 2, 0, 0 - var_header = MQTT_HDR_CONNECT - var_header[6] = clean_session << 1 - - # Set up variable header and remaining_length - remaining_length = 12 + len(self.client_id.encode("utf-8")) - if self._username: - remaining_length += ( - 2 - + len(self._username.encode("utf-8")) - + 2 - + len(self._password.encode("utf-8")) - ) - var_header[6] |= 0xC0 - if self.keep_alive: - assert self.keep_alive < MQTT_TOPIC_LENGTH_LIMIT - var_header[7] |= self.keep_alive >> 8 - var_header[8] |= self.keep_alive & 0x00FF - if self._lw_topic: - remaining_length += ( - 2 + len(self._lw_topic.encode("utf-8")) + 2 + len(self._lw_msg) - ) - var_header[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 - var_header[6] |= self._lw_retain << 5 - - # Remaining length calculation - large_rel_length = False - if remaining_length > 0x7F: - large_rel_length = True - # Calculate Remaining Length [2.2.3] - while remaining_length > 0: - encoded_byte = remaining_length % 0x80 - remaining_length = remaining_length // 0x80 - # if there is more data to encode, set the top bit of the byte - if remaining_length > 0: - encoded_byte |= 0x80 - fixed_header.append(encoded_byte) - if large_rel_length: - fixed_header.append(0x00) - else: - fixed_header.append(remaining_length) - fixed_header.append(0x00) - - if self.logger is not None: - self.logger.debug("Sending CONNECT to broker...") - self.logger.debug( - "Fixed Header: %s\nVariable Header: %s", fixed_header, var_header - ) - self._sock.send(fixed_header) - self._sock.send(var_header) - # [MQTT-3.1.3-4] - self._send_str(self.client_id) - if self._lw_topic: - # [MQTT-3.1.3-11] - self._send_str(self._lw_topic) - self._send_str(self._lw_msg) - if self._username is None: - self._username = None - else: - self._send_str(self._username) - self._send_str(self._password) - if self.logger is not None: - self.logger.debug("Receiving CONNACK packet from broker") - while True: - op = self._wait_for_msg() - if op == 32: - rc = self._sock_exact_recv(3) - assert rc[0] == 0x02 - if rc[2] != 0x00: - raise MMQTTException(CONNACK_ERRORS[rc[2]]) - self._is_connected = True - result = rc[0] & 1 - if self.on_connect is not None: - self.on_connect(self, self._user_data, result, rc[2]) - return result - - def disconnect(self): - """Disconnects the MiniMQTT client from the MQTT broker.""" - self.is_connected() - if self.logger is not None: - self.logger.debug("Sending DISCONNECT packet to broker") - try: - self._sock.send(MQTT_DISCONNECT) - except RuntimeError as e: - if self.logger is not None: - self.logger.warning("Unable to send DISCONNECT packet: {}".format(e)) - if self.logger is not None: - self.logger.debug("Closing socket") - self._sock.close() - self._is_connected = False - self._subscribed_topics = [] - if self.on_disconnect is not None: - self.on_disconnect(self, self._user_data, 0) - - def ping(self): - """Pings the MQTT Broker to confirm if the broker is alive or if - there is an active network connection. - Returns response codes of any messages received while waiting for PINGRESP. - """ - self.is_connected() - if self.logger is not None: - self.logger.debug("Sending PINGREQ") - self._sock.send(MQTT_PINGREQ) - ping_timeout = self.keep_alive - stamp = time.monotonic() - rc, rcs = None, [] - while rc != MQTT_PINGRESP: - rc = self._wait_for_msg() - if rc: - rcs.append(rc) - if time.monotonic() - stamp > ping_timeout: - raise MMQTTException("PINGRESP not returned from broker.") - return rcs - - # pylint: disable=too-many-branches, too-many-statements - def publish(self, topic, msg, retain=False, qos=0): - """Publishes a message to a topic provided. - - :param str topic: Unique topic identifier. - :param str|int|float|bytes msg: Data to send to the broker. - :param bool retain: Whether the message is saved by the broker. - :param int qos: Quality of Service level for the message, defaults to zero. - - """ - self.is_connected() - self._valid_topic(topic) - if "+" in topic or "#" in topic: - raise MMQTTException("Publish topic can not contain wildcards.") - # check msg/qos kwargs - if msg is None: - raise MMQTTException("Message can not be None.") - if isinstance(msg, (int, float)): - msg = str(msg).encode("ascii") - elif isinstance(msg, str): - msg = str(msg).encode("utf-8") - elif isinstance(msg, bytes): - pass - else: - raise MMQTTException("Invalid message data type.") - if len(msg) > MQTT_MSG_MAX_SZ: - raise MMQTTException("Message size larger than %d bytes." % MQTT_MSG_MAX_SZ) - assert ( - 0 <= qos <= 1 - ), "Quality of Service Level 2 is unsupported by this library." - - # fixed header. [3.3.1.2], [3.3.1.3] - pub_hdr_fixed = bytearray([0x30 | retain | qos << 1]) - - # variable header = 2-byte Topic length (big endian) - pub_hdr_var = bytearray(struct.pack(">H", len(topic.encode("utf-8")))) - pub_hdr_var.extend(topic.encode("utf-8")) # Topic name - - remaining_length = 2 + len(msg) + len(topic.encode("utf-8")) - if qos > 0: - # packet identifier where QoS level is 1 or 2. [3.3.2.2] - remaining_length += 2 - self._pid = self._pid + 1 if self._pid < 0xFFFF else 1 - pub_hdr_var.append(self._pid >> 8) - pub_hdr_var.append(self._pid & 0xFF) - - # Calculate remaining length [2.2.3] - if remaining_length > 0x7F: - while remaining_length > 0: - encoded_byte = remaining_length % 0x80 - remaining_length = remaining_length // 0x80 - if remaining_length > 0: - encoded_byte |= 0x80 - pub_hdr_fixed.append(encoded_byte) - else: - pub_hdr_fixed.append(remaining_length) - - if self.logger is not None: - self.logger.debug( - "Sending PUBLISH\nTopic: %s\nMsg: %s\ - \nQoS: %d\nRetain? %r", - topic, - msg, - qos, - retain, - ) - self._sock.send(pub_hdr_fixed) - self._sock.send(pub_hdr_var) - self._sock.send(msg) - if qos == 0 and self.on_publish is not None: - self.on_publish(self, self._user_data, topic, self._pid) - if qos == 1: - while True: - op = self._wait_for_msg() - if op == 0x40: - sz = self._sock_exact_recv(1) - assert sz == b"\x02" - rcv_pid = self._sock_exact_recv(2) - rcv_pid = rcv_pid[0] << 0x08 | rcv_pid[1] - if self._pid == rcv_pid: - if self.on_publish is not None: - self.on_publish(self, self._user_data, topic, rcv_pid) - return - - def subscribe(self, topic, qos=0): - """Subscribes to a topic on the MQTT Broker. - This method can subscribe to one topics or multiple topics. - - :param str|tuple|list topic: Unique MQTT topic identifier string. If - this is a `tuple`, then the tuple should - contain topic identifier string and qos - level integer. If this is a `list`, then - each list element should be a tuple containing - a topic identifier string and qos level integer. - :param int qos: Quality of Service level for the topic, defaults to - zero. Conventional options are ``0`` (send at most once), ``1`` - (send at least once), or ``2`` (send exactly once). - - """ - self.is_connected() - topics = None - if isinstance(topic, tuple): - topic, qos = topic - self._valid_topic(topic) - self._valid_qos(qos) - if isinstance(topic, str): - self._valid_topic(topic) - self._valid_qos(qos) - topics = [(topic, qos)] - if isinstance(topic, list): - topics = [] - for t, q in topic: - self._valid_qos(q) - self._valid_topic(t) - topics.append((t, q)) - # Assemble packet - packet_length = 2 + (2 * len(topics)) + (1 * len(topics)) - packet_length += sum(len(topic.encode("utf-8")) for topic, qos in topics) - packet_length_byte = packet_length.to_bytes(1, "big") - self._pid = self._pid + 1 if self._pid < 0xFFFF else 1 - packet_id_bytes = self._pid.to_bytes(2, "big") - # Packet with variable and fixed headers - packet = MQTT_SUB + packet_length_byte + packet_id_bytes - # attaching topic and QOS level to the packet - for t, q in topics: - topic_size = len(t.encode("utf-8")).to_bytes(2, "big") - qos_byte = q.to_bytes(1, "big") - packet += topic_size + t.encode() + qos_byte - if self.logger is not None: - for t, q in topics: - self.logger.debug("SUBSCRIBING to topic %s with QoS %d", t, q) - self._sock.send(packet) - while True: - op = self._wait_for_msg() - if op == 0x90: - rc = self._sock_exact_recv(4) - assert rc[1] == packet[2] and rc[2] == packet[3] - if rc[3] == 0x80: - raise MMQTTException("SUBACK Failure!") - for t, q in topics: - if self.on_subscribe is not None: - self.on_subscribe(self, self._user_data, t, q) - self._subscribed_topics.append(t) - return - - def unsubscribe(self, topic): - """Unsubscribes from a MQTT topic. - - :param str|list topic: Unique MQTT topic identifier string or list. - - """ - topics = None - if isinstance(topic, str): - self._valid_topic(topic) - topics = [(topic)] - if isinstance(topic, list): - topics = [] - for t in topic: - self._valid_topic(t) - topics.append((t)) - for t in topics: - if t not in self._subscribed_topics: - raise MMQTTException( - "Topic must be subscribed to before attempting unsubscribe." - ) - # Assemble packet - packet_length = 2 + (2 * len(topics)) - packet_length += sum(len(topic.encode("utf-8")) for topic in topics) - packet_length_byte = packet_length.to_bytes(1, "big") - self._pid = self._pid + 1 if self._pid < 0xFFFF else 1 - packet_id_bytes = self._pid.to_bytes(2, "big") - packet = MQTT_UNSUB + packet_length_byte + packet_id_bytes - for t in topics: - topic_size = len(t.encode("utf-8")).to_bytes(2, "big") - packet += topic_size + t.encode() - if self.logger is not None: - for t in topics: - self.logger.debug("UNSUBSCRIBING from topic %s", t) - self._sock.send(packet) - if self.logger is not None: - self.logger.debug("Waiting for UNSUBACK...") - while True: - op = self._wait_for_msg() - if op == 176: - rc = self._sock_exact_recv(3) - assert rc[0] == 0x02 - # [MQTT-3.32] - assert rc[1] == packet_id_bytes[0] and rc[2] == packet_id_bytes[1] - for t in topics: - if self.on_unsubscribe is not None: - self.on_unsubscribe(self, self._user_data, t, self._pid) - self._subscribed_topics.remove(t) - return - - def reconnect(self, resub_topics=True): - """Attempts to reconnect to the MQTT broker. - - :param bool resub_topics: Resubscribe to previously subscribed topics. - - """ - if self.logger is not None: - self.logger.debug("Attempting to reconnect with MQTT broker") - self.connect() - if self.logger is not None: - self.logger.debug("Reconnected with broker") - if resub_topics: - if self.logger is not None: - self.logger.debug( - "Attempting to resubscribe to previously subscribed topics." - ) - subscribed_topics = self._subscribed_topics.copy() - self._subscribed_topics = [] - while subscribed_topics: - feed = subscribed_topics.pop() - self.subscribe(feed) - - def loop(self, timeout=1): - """Non-blocking message loop. Use this method to - check incoming subscription messages. - Returns response codes of any messages received. - - :param int timeout: Socket timeout, in seconds. - - """ - if self._timestamp == 0: - self._timestamp = time.monotonic() - current_time = time.monotonic() - if current_time - self._timestamp >= self.keep_alive: - self._timestamp = 0 - # Handle KeepAlive by expecting a PINGREQ/PINGRESP from the server - if self.logger is not None: - self.logger.debug( - "KeepAlive period elapsed - requesting a PINGRESP from the server..." - ) - rcs = self.ping() - return rcs - self._sock.settimeout(timeout) - rc = self._wait_for_msg() - return [rc] if rc else None - - def _wait_for_msg(self, timeout=0.1): - """Reads and processes network events.""" - # CPython socket module contains a timeout attribute - if hasattr(self._socket_pool, "timeout"): - try: - res = self._sock_exact_recv(1) - except self._socket_pool.timeout: - return None - else: # socketpool, esp32spi - try: - res = self._sock_exact_recv(1) - except OSError as error: - if error.errno == errno.ETIMEDOUT: - # raised by a socket timeout if 0 bytes were present - return None - raise MMQTTException from error - - # Block while we parse the rest of the response - self._sock.settimeout(timeout) - if res in [None, b""]: - # If we get here, it means that there is nothing to be received - return None - if res[0] == MQTT_PINGRESP: - if self.logger is not None: - self.logger.debug("Got PINGRESP") - sz = self._sock_exact_recv(1)[0] - if sz != 0x00: - raise MMQTTException( - "Unexpected PINGRESP returned from broker: {}.".format(sz) - ) - return MQTT_PINGRESP - if res[0] & 0xF0 != 0x30: - return res[0] - sz = self._recv_len() - # topic length MSB & LSB - topic_len = self._sock_exact_recv(2) - topic_len = (topic_len[0] << 8) | topic_len[1] - topic = self._sock_exact_recv(topic_len) - topic = str(topic, "utf-8") - sz -= topic_len + 2 - pid = 0 - if res[0] & 0x06: - pid = self._sock_exact_recv(2) - pid = pid[0] << 0x08 | pid[1] - sz -= 0x02 - # read message contents - raw_msg = self._sock_exact_recv(sz) - msg = raw_msg if self._use_binary_mode else str(raw_msg, "utf-8") - if self.logger is not None: - self.logger.debug( - "Receiving SUBSCRIBE \nTopic: %s\nMsg: %s\n", topic, raw_msg - ) - self._handle_on_message(self, topic, msg) - if res[0] & 0x06 == 0x02: - pkt = bytearray(b"\x40\x02\0\0") - struct.pack_into("!H", pkt, 2, pid) - self._sock.send(pkt) - elif res[0] & 6 == 4: - assert 0 - return res[0] - - def _recv_len(self): - """Unpack MQTT message length.""" - n = 0 - sh = 0 - b = bytearray(1) - while True: - b = self._sock_exact_recv(1)[0] - n |= (b & 0x7F) << sh - if not b & 0x80: - return n - sh += 7 - - def _recv_into(self, buf, size=0): - """Backwards-compatible _recv_into implementation.""" - if self._backwards_compatible_sock: - size = len(buf) if size == 0 else size - b = self._sock.recv(size) - read_size = len(b) - buf[:read_size] = b - return read_size - return self._sock.recv_into(buf, size) - - def _sock_exact_recv(self, bufsize): - """Reads _exact_ number of bytes from the connected socket. Will only return - string with the exact number of bytes requested. - - The semantics of native socket receive is that it returns no more than the - specified number of bytes (i.e. max size). However, it makes no guarantees in - terms of the minimum size of the buffer, which could be 1 byte. This is a - wrapper for socket recv() to ensure that no less than the expected number of - bytes is returned or trigger a timeout exception. - - :param int bufsize: number of bytes to receive - - """ - if not self._backwards_compatible_sock: - # CPython/Socketpool Impl. - rc = bytearray(bufsize) - self._sock.recv_into(rc, bufsize) - else: # ESP32SPI Impl. - stamp = time.monotonic() - read_timeout = self.keep_alive - # This will timeout with socket timeout (not keepalive timeout) - rc = self._sock.recv(bufsize) - if not rc: - if self.logger is not None: - self.logger.debug("_sock_exact_recv timeout") - # If no bytes waiting, raise same exception as socketpool - raise OSError(errno.ETIMEDOUT) - # If any bytes waiting, try to read them all, - # or raise exception if wait longer than read_timeout - to_read = bufsize - len(rc) - assert to_read >= 0 - read_timeout = self.keep_alive - while to_read > 0: - recv = self._sock.recv(to_read) - to_read -= len(recv) - rc += recv - if time.monotonic() - stamp > read_timeout: - raise MMQTTException( - "Unable to receive {} bytes within {} seconds.".format( - to_read, read_timeout - ) - ) - return rc - - def _send_str(self, string): - """Encodes a string and sends it to a socket. - - :param str string: String to write to the socket. - - """ - if isinstance(string, str): - self._sock.send(struct.pack("!H", len(string.encode("utf-8")))) - self._sock.send(str.encode(string, "utf-8")) - else: - self._sock.send(struct.pack("!H", len(string))) - self._sock.send(string) - - @staticmethod - def _valid_topic(topic): - """Validates if topic provided is proper MQTT topic format. - - :param str topic: Topic identifier - - """ - if topic is None: - raise MMQTTException("Topic may not be NoneType") - # [MQTT-4.7.3-1] - if not topic: - raise MMQTTException("Topic may not be empty.") - # [MQTT-4.7.3-3] - if len(topic.encode("utf-8")) > MQTT_TOPIC_LENGTH_LIMIT: - raise MMQTTException("Topic length is too large.") - - @staticmethod - def _valid_qos(qos_level): - """Validates if the QoS level is supported by this library - - :param int qos_level: Desired QoS level. - - """ - if isinstance(qos_level, int): - if qos_level < 0 or qos_level > 2: - raise MMQTTException("QoS must be between 1 and 2.") - else: - raise MMQTTException("QoS must be an integer.") - - def is_connected(self): - """Returns MQTT client session status as True if connected, raises - a `MMQTTException` if `False`. - """ - if self._sock is None or self._is_connected is False: - raise MMQTTException("MiniMQTT is not connected.") - return self._is_connected - - # Logging - def enable_logger(self, logger, log_level=20): - """Enables library logging provided a logger object. - - :param logger: A python logger pacakge. - :param log_level: Numeric value of a logging level, defaults to INFO. - - """ - self.logger = logger.getLogger("log") - self.logger.setLevel(log_level) - - def disable_logger(self): - """Disables logging.""" - if not self.logger: - raise MMQTTException("Can not disable logger, no logger found.") - self.logger = None diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_rtttl.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_rtttl.py deleted file mode 100644 index 75665f1bb5..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/adafruit_rtttl.py +++ /dev/null @@ -1,204 +0,0 @@ -# The MIT License (MIT) -# -# Copyright (c) 2017, 2018 Scott Shawcroft for Adafruit Industries -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in -# all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -# THE SOFTWARE. -""" -`adafruit_rtttl` -==================================================== - -Play notes to a digialio pin using ring tone text transfer language (rtttl). - -* Author(s): Scott Shawcroft -""" - -__version__ = "2.4.2" -__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RTTTL" - -import sys -import time -import pwmio - -AUDIOIO_AVAILABLE = False -try: - import audioio - from adafruit_waveform import sine - - AUDIOIO_AVAILABLE = True - try: - import audiocore - except ImportError: - audiocore = audioio -except ImportError: - pass - -PIANO = { - "4c": 261.626, - "4c#": 277.183, - "4d": 293.665, - "4d#": 311.127, - "4e": 329.628, - "4f": 349.228, - "4f#": 369.994, - "4g": 391.995, - "4g#": 415.305, - "4a": 440, - "4a#": 466.164, - "4b": 493.883, - "5c": 523.251, - "5c#": 554.365, - "5d": 587.330, - "5d#": 622.254, - "5e": 659.255, - "5f": 698.456, - "5f#": 739.989, - "5g": 783.991, - "5g#": 830.609, - "5a": 880, - "5a#": 932.328, - "5b": 987.767, - "6c": 1046.50, - "6c#": 1108.73, - "6d": 1174.66, - "6d#": 1244.51, - "6e": 1318.51, - "6f": 1396.91, - "6f#": 1479.98, - "6g": 1567.98, - "6g#": 1661.22, - "6a": 1760, - "6a#": 1864.66, - "6b": 1975.53, - "7c": 2093, - "7c#": 2217.46, -} - - -def _parse_note(note, duration=2, octave="6"): - note = note.strip() - piano_note = None - note_duration = duration - if note[0].isdigit() and note[1].isdigit(): - note_duration = int(note[:2]) - piano_note = note[2] - elif note[0].isdigit(): - note_duration = int(note[0]) - piano_note = note[1] - else: - piano_note = note[0] - if "." in note: - note_duration *= 1.5 - if "#" in note: - piano_note += "#" - note_octave = octave - if note[-1].isdigit(): - note_octave = note[-1] - piano_note = note_octave + piano_note - return piano_note, note_duration - - -def _get_wave(tune, octave): - """Returns the proper waveform to play the song along with the minimum - frequency in the song. - """ - min_freq = 13000 - - for note in tune.split(","): - piano_note, _ = _parse_note(note, octave=octave) - if piano_note[-1] != "p" and PIANO[piano_note] < min_freq: - min_freq = PIANO[piano_note] - return sine.sine_wave(16000, min_freq), min_freq - - -# pylint: disable-msg=too-many-arguments -def _play_to_pin(tune, base_tone, min_freq, duration, octave, tempo): - """Using the prepared input send the notes to the pin - """ - pwm = isinstance(base_tone, pwmio.PWMOut) - for note in tune.split(","): - piano_note, note_duration = _parse_note(note, duration, octave) - if piano_note in PIANO: - if pwm: - base_tone.frequency = int(PIANO[piano_note]) - base_tone.duty_cycle = 2 ** 15 - else: - # AudioOut interface changed in CP 3.x - if sys.implementation.version[0] >= 3: - pitch = int(PIANO[piano_note]) - sine_wave = sine.sine_wave(16000, pitch) - sine_wave_sample = audiocore.RawSample(sine_wave) - base_tone.play(sine_wave_sample, loop=True) - else: - base_tone.frequency = int(16000 * (PIANO[piano_note] / min_freq)) - base_tone.play(loop=True) - - time.sleep(4 / note_duration * 60 / tempo) - if pwm: - base_tone.duty_cycle = 0 - else: - base_tone.stop() - time.sleep(0.02) - - -# pylint: disable-msg=too-many-arguments -def play(pin, rtttl, octave=None, duration=None, tempo=None): - """Play notes to a digialio pin using ring tone text transfer language (rtttl). - :param ~digitalio.DigitalInOut pin: the speaker pin - :param rtttl: string containing rtttl - :param int octave: represents octave number (default 6 starts at middle c) - :param int duration: length of notes (default 4 quarter note) - :param int tempo: how fast (default 63 beats per minute) - """ - _, defaults, tune = rtttl.lower().split(":") - for default in defaults.split(","): - if default[0] == "d" and not duration: - duration = int(default[2:]) - elif default[0] == "o" and not octave: - octave = default[2:] - elif default[0] == "b" and not tempo: - tempo = int(default[2:]) - if not octave: - octave = 6 - if not duration: - duration = 4 - if not tempo: - tempo = 63 - - base_tone = None - min_freq = 440 - if AUDIOIO_AVAILABLE: - wave, min_freq = _get_wave(tune, octave) - try: - # AudioOut interface changed in CP 3.x; a waveform if now pass - # directly to .play(), generated for each note in _play_to_pin() - if sys.implementation.version[0] >= 3: - base_tone = audioio.AudioOut(pin) - else: - base_tone = audioio.AudioOut(pin, wave) - except ValueError: - # No DAC on the pin so use PWM. - pass - - # Fall back to PWM - if not base_tone: - base_tone = pwmio.PWMOut(pin, duty_cycle=0, variable_frequency=True) - - _play_to_pin(tune, base_tone, min_freq, duration, octave, tempo) - - base_tone.deinit() diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/blynklib.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/blynklib.py deleted file mode 100644 index c174d91c7f..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/blynklib.py +++ /dev/null @@ -1,390 +0,0 @@ -# Copyright (c) 2019-2020 Anton Morozenko -# Copyright (c) 2015-2019 Volodymyr Shymanskyy. -# See the file LICENSE for copying permission. -# 宋义深 2021/4/25 Modified -__version__ = '0.2.6' - -import socketpool as socket -import ssl -import struct -import time -import wifi - -LOGO = """ - ___ __ __ - / _ )/ /_ _____ / /__ - / _ / / // / _ \\/ '_/ - /____/_/\\_, /_//_/_/\\_\\ - /___/ for Python v{}\n""".format(__version__) - - -def stub_log(*args): - pass - - -def ticks_ms(): - return int(time.time() * 1000) - - -def sleep_ms(ms): - time.sleep(ms // 1000) - - -class BlynkError(Exception): - pass - - -class RedirectError(Exception): - def __init__(self, server, port): - self.server = server - self.port = port - - -class Protocol(object): - MSG_RSP = 0 - MSG_LOGIN = 2 - MSG_PING = 6 - MSG_TWEET = 12 - MSG_EMAIL = 13 - MSG_NOTIFY = 14 - MSG_BRIDGE = 15 - MSG_HW_SYNC = 16 - MSG_INTERNAL = 17 - MSG_PROPERTY = 19 - MSG_HW = 20 - MSG_REDIRECT = 41 - MSG_HEAD_LEN = 5 - - STATUS_INVALID_TOKEN = 9 - STATUS_NO_DATA = 17 - STATUS_OK = 200 - VPIN_MAX_NUM = 32 - - _msg_id = 0 - - def _get_msg_id(self, **kwargs): - if 'msg_id' in kwargs: - return kwargs['msg_id'] - self._msg_id += 1 - return self._msg_id if self._msg_id <= 0xFFFF else 1 - - def _pack_msg(self, msg_type, *args, **kwargs): - data = ('\0'.join([str(curr_arg) for curr_arg in args])).encode('utf-8') - return struct.pack('!BHH', msg_type, self._get_msg_id(**kwargs), len(data)) + data - - def parse_response(self, rsp_data, msg_buffer): - msg_args = [] - try: - msg_type, msg_id, h_data = struct.unpack('!BHH', rsp_data[:self.MSG_HEAD_LEN]) - except Exception as p_err: - print(p_err) - raise BlynkError('Message parse error: {}'.format(p_err)) - if msg_id == 0: - raise BlynkError('invalid msg_id == 0') - elif h_data >= msg_buffer: - raise BlynkError('Command too long. Length = {}'.format(h_data)) - elif msg_type in (self.MSG_RSP, self.MSG_PING): - pass - elif msg_type in (self.MSG_HW, self.MSG_BRIDGE, self.MSG_INTERNAL, self.MSG_REDIRECT): - msg_body = rsp_data[self.MSG_HEAD_LEN: self.MSG_HEAD_LEN + h_data] - msg_args = [itm.decode('utf-8') for itm in msg_body.split(b'\0')] - else: - print('unknown') - raise BlynkError("Unknown message type: '{}'".format(msg_type)) - return msg_type, msg_id, h_data, msg_args - - def heartbeat_msg(self, heartbeat, rcv_buffer): - return self._pack_msg(self.MSG_INTERNAL, 'ver', __version__, 'buff-in', rcv_buffer, 'h-beat', heartbeat, - 'dev', 'python') - - def login_msg(self, token): - return self._pack_msg(self.MSG_LOGIN, token) - - def ping_msg(self): - return self._pack_msg(self.MSG_PING) - - def response_msg(self, *args, **kwargs): - return self._pack_msg(self.MSG_RSP, *args, **kwargs) - - def virtual_write_msg(self, v_pin, *val): - return self._pack_msg(self.MSG_HW, 'vw', v_pin, *val) - - def virtual_sync_msg(self, *pins): - return self._pack_msg(self.MSG_HW_SYNC, 'vr', *pins) - - def email_msg(self, to, subject, body): - return self._pack_msg(self.MSG_EMAIL, to, subject, body) - - def tweet_msg(self, msg): - return self._pack_msg(self.MSG_TWEET, msg) - - def notify_msg(self, msg): - return self._pack_msg(self.MSG_NOTIFY, msg) - - def set_property_msg(self, pin, prop, *val): - return self._pack_msg(self.MSG_PROPERTY, pin, prop, *val) - - def internal_msg(self, *args): - return self._pack_msg(self.MSG_INTERNAL, *args) - - -class Connection(Protocol): - SOCK_MAX_TIMEOUT = 5 - SOCK_TIMEOUT = 0 - SOCK_SSL_TIMEOUT = 1 - EAGAIN = 11 - ETIMEDOUT = 60 - RETRIES_TX_DELAY = 2 - RETRIES_TX_MAX_NUM = 3 - RECONNECT_SLEEP = 1 - TASK_PERIOD_RES = 50 - DISCONNECTED = 0 - CONNECTING = 1 - AUTHENTICATING = 2 - AUTHENTICATED = 3 - - _state = None - _socket = None - _socketPool = None - _last_rcv_time = 0 - _last_ping_time = 0 - _last_send_time = 0 - - def __init__(self, token, server='blynk-cloud.com', port=80, ssl_cert=None, heartbeat=10, rcv_buffer=1024, - log=stub_log): - self.token = token - self.server = server - self.port = port - self.heartbeat = heartbeat - self.rcv_buffer = rcv_buffer - self.log = log - self.ssl_cert = ssl_cert - - def send(self, data): - retries = self.RETRIES_TX_MAX_NUM - while retries > 0: - try: - retries -= 1 - self._last_send_time = ticks_ms() - return self._socket.send(data) - except (Exception): - sleep_ms(self.RETRIES_TX_DELAY) - - def receive(self, length, timeout): - d_buff = bytearray(length) - try: - self._socket.settimeout(timeout) - recv_length = self._socket.recv_into(d_buff) - d_buff = d_buff[:recv_length] - #print(self.parse_response(bytes(d_buff), self.rcv_buffer)) - return bytes(d_buff) - except (Exception) as err: - if 'timed out' in str(err): - return b'' - if str(self.EAGAIN) in str(err) or str(self.ETIMEDOUT) in str(err): - return b'' - raise - - def is_server_alive(self): - now = ticks_ms() - h_beat_ms = self.heartbeat * 1000 - rcv_delta = now - self._last_rcv_time - ping_delta = now - self._last_ping_time - send_delta = now - self._last_send_time - if rcv_delta > h_beat_ms + (h_beat_ms // 2): - return False - if (ping_delta > h_beat_ms // 10) and (send_delta > h_beat_ms or rcv_delta > h_beat_ms): - self.send(self.ping_msg()) - self.log('Heartbeat time: {}'.format(now)) - self._last_ping_time = now - return True - - def _get_socket(self): - try: - self._state = self.CONNECTING - self._socket = socket.SocketPool(wifi.radio).socket() - self._socketPool = socket.SocketPool(wifi.radio) - self._socket.connect(self._socketPool.getaddrinfo(self.server, self.port)[0][4]) - self._socket.settimeout(self.SOCK_TIMEOUT) - if self.ssl_cert: - # system default CA certificates case - if self.ssl_cert == "default": - self.ssl_cert = None - self.log('Using SSL socket...') - ssl_context = ssl.create_default_context(cafile=self.ssl_cert) - ssl_context.verify_mode = ssl.CERT_REQUIRED - self._socket.settimeout(self.SOCK_SSL_TIMEOUT) - self._socket = ssl_context.wrap_socket(sock=self._socket, server_hostname=self.server) - self.log('Connected to blynk server') - except Exception as g_exc: - print(g_exc) - raise BlynkError('Connection with the Blynk server failed: {}'.format(g_exc)) - - def _authenticate(self): - self._state = self.AUTHENTICATING - self.send(self.login_msg(self.token)) - rsp_data = self.receive(self.rcv_buffer, self.SOCK_MAX_TIMEOUT) - if not rsp_data: - raise BlynkError('Auth stage timeout') - msg_type, _, status, args = self.parse_response(rsp_data, self.rcv_buffer) - if status != self.STATUS_OK: - if status == self.STATUS_INVALID_TOKEN: - raise BlynkError('Invalid Auth Token') - if msg_type == self.MSG_REDIRECT: - raise RedirectError(*args) - raise BlynkError('Auth stage failed. Status={}'.format(status)) - self._state = self.AUTHENTICATED - - def _set_heartbeat(self): - self.send(self.heartbeat_msg(self.heartbeat, self.rcv_buffer)) - rcv_data = self.receive(self.rcv_buffer, self.SOCK_MAX_TIMEOUT) - if not rcv_data: - raise BlynkError('Heartbeat stage timeout') - _, _, status, _ = self.parse_response(rcv_data, self.rcv_buffer) - if status != self.STATUS_OK: - raise BlynkError('Set heartbeat returned code={}'.format(status)) - self.log('Heartbeat = {} sec. MaxCmdBuffer = {} bytes'.format(self.heartbeat, self.rcv_buffer)) - - def connected(self): - return True if self._state == self.AUTHENTICATED else False - - -class Blynk(Connection): - _CONNECT_TIMEOUT = 30 # 30sec - _VPIN_WILDCARD = '*' - _VPIN_READ = 'read v' - _VPIN_WRITE = 'write v' - _INTERNAL = 'internal_' - _CONNECT = 'connect' - _DISCONNECT = 'disconnect' - _VPIN_READ_ALL = '{}{}'.format(_VPIN_READ, _VPIN_WILDCARD) - _VPIN_WRITE_ALL = '{}{}'.format(_VPIN_WRITE, _VPIN_WILDCARD) - _events = {} - - def __init__(self, token, **kwargs): - Connection.__init__(self, token, **kwargs) - self._start_time = ticks_ms() - self._last_rcv_time = ticks_ms() - self._last_send_time = ticks_ms() - self._last_ping_time = ticks_ms() - self._state = self.DISCONNECTED - print(LOGO) - - def connect(self, timeout=_CONNECT_TIMEOUT): - end_time = time.time() + timeout - while not self.connected(): - if self._state == self.DISCONNECTED: - try: - self._get_socket() - print('[Connecting 1/5] Socket got.') - self._authenticate() - print('[Connecting 2/5] Authenticated.') - self._set_heartbeat() - print('[Connecting 3/5] Heartbeat sent.') - self._last_rcv_time = ticks_ms() - print('[Connecting 4/5] Last receive time set.') - self.log('Registered events: {}\n'.format(list(self._events.keys()))) - print('[Connecting 5/5] Events registered.') - self.call_handler(self._CONNECT) - return True - except BlynkError as b_err: - self.disconnect(b_err) - sleep_ms(self.TASK_PERIOD_RES) - except RedirectError as r_err: - self.disconnect() - self.server = r_err.server - self.port = r_err.port - sleep_ms(self.TASK_PERIOD_RES) - if time.time() >= end_time: - return False - - def disconnect(self, err_msg=None): - self.call_handler(self._DISCONNECT) - if self._socket: - self._socket.close() - self._state = self.DISCONNECTED - if err_msg: - self.log('[ERROR]: {}\nConnection closed'.format(err_msg)) - self._msg_id = 0 - time.sleep(self.RECONNECT_SLEEP) - - def virtual_write(self, v_pin, *val): - return self.send(self.virtual_write_msg(v_pin, *val)) - - def virtual_sync(self, *v_pin): - return self.send(self.virtual_sync_msg(*v_pin)) - - def email(self, to, subject, body): - return self.send(self.email_msg(to, subject, body)) - - def tweet(self, msg): - return self.send(self.tweet_msg(msg)) - - def notify(self, msg): - return self.send(self.notify_msg(msg)) - - def set_property(self, v_pin, property_name, *val): - return self.send(self.set_property_msg(v_pin, property_name, *val)) - - def internal(self, *args): - return self.send(self.internal_msg(*args)) - - def handle_event(blynk, event_name): - class Deco(object): - def __init__(self, func): - self.func = func - # wildcard 'read V*' and 'write V*' events handling - if str(event_name).lower() in (blynk._VPIN_READ_ALL, blynk._VPIN_WRITE_ALL): - event_base_name = str(event_name).split(blynk._VPIN_WILDCARD)[0] - for i in range(blynk.VPIN_MAX_NUM + 1): - blynk._events['{}{}'.format(event_base_name.lower(), i)] = func - else: - blynk._events[str(event_name).lower()] = func - - def __call__(self): - return self.func() - - return Deco - - def call_handler(self, event, *args, **kwargs): - if event in self._events.keys(): - self.log("Event: ['{}'] -> {}".format(event, args)) - self._events[event](*args, **kwargs) - - def process(self, msg_type, msg_id, msg_len, msg_args): - if msg_type == self.MSG_RSP: - self.log('Response status: {}'.format(msg_len)) - elif msg_type == self.MSG_PING: - self.send(self.response_msg(self.STATUS_OK, msg_id=msg_id)) - elif msg_type in (self.MSG_HW, self.MSG_BRIDGE, self.MSG_INTERNAL): - if msg_type == self.MSG_INTERNAL: - self.call_handler("{}{}".format(self._INTERNAL, msg_args[0]), msg_args[1:]) - elif len(msg_args) >= 3 and msg_args[0] == 'vw': - self.call_handler("{}{}".format(self._VPIN_WRITE, msg_args[1]), int(msg_args[1]), msg_args[2:]) - elif len(msg_args) == 2 and msg_args[0] == 'vr': - self.call_handler("{}{}".format(self._VPIN_READ, msg_args[1]), int(msg_args[1])) - - def read_response(self, timeout=0.5): - end_time = time.time() + timeout - while time.time() <= end_time: - rsp_data = self.receive(self.rcv_buffer, self.SOCK_TIMEOUT) - if rsp_data: - self._last_rcv_time = ticks_ms() - msg_type, msg_id, h_data, msg_args = self.parse_response(rsp_data, self.rcv_buffer) - self.process(msg_type, msg_id, h_data, msg_args) - - def run(self): - if not self.connected(): - self.connect() - else: - try: - self.read_response(timeout=self.SOCK_TIMEOUT) - if not self.is_server_alive(): - self.disconnect('Blynk server is offline') - except KeyboardInterrupt: - raise - except BlynkError as b_err: - self.log(b_err) - self.disconnect() - except Exception as g_exc: - self.log(g_exc) diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/blynktimer.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/blynktimer.py deleted file mode 100644 index 8fe26ad3f6..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/blynktimer.py +++ /dev/null @@ -1,134 +0,0 @@ -# Copyright (c) 2019-2020 Anton Morozenko -# 宋义深 2021/4/25 Modified -""" -Polling timers for functions. -Registers timers and performs run once or periodical function execution after defined time intervals. -""" -# select.select call used as polling waiter where it is possible -# cause time.sleep sometimes may load CPU up to 100% with small polling wait interval -try: - # cpython - import time - import select - - polling_wait = lambda x: select.select([], [], [], x) - polling_wait(0.01) -except OSError: - # windows case where select.select call fails - polling_wait = lambda x: time.sleep(x) - -except ImportError: - # micropython - import time - - try: - from uselect import select as s_select - - polling_wait = lambda x: s_select([], [], [], x) - except ImportError: - # case when micropython port does not support select.select - polling_wait = lambda x: time.sleep(x) - -WAIT_SEC = 0.05 -MAX_TIMERS = 16 -DEFAULT_INTERVAL = 10 - - -class TimerError(Exception): - pass - - -class Timer(object): - timers = {} - - def __init__(self, no_timers_err=True): - self.no_timers_err = no_timers_err - - def _get_func_name(self, obj): - """retrieves a suitable name for a function""" - if hasattr(obj, 'func'): - # handles nested decorators - return self._get_func_name(obj.func) - # simply returns 'timer' if on port without function attrs - return getattr(obj, '__name__', 'timer') - - def register(blynk, *args, **kwargs): - # kwargs with defaults are used cause PEP 3102 no supported by Python2 - interval = kwargs.pop('interval', DEFAULT_INTERVAL) - run_once = kwargs.pop('run_once', False) - stopped = kwargs.pop('stopped', False) - - class Deco(object): - def __init__(self, func): - self.func = func - if len(list(Timer.timers.keys())) >= MAX_TIMERS: - raise TimerError('Max allowed timers num={}'.format(MAX_TIMERS)) - _timer = _Timer(interval, func, run_once, stopped, *args, **kwargs) - Timer.timers['{}_{}'.format(len(list(Timer.timers.keys())), blynk._get_func_name(func))] = _timer - - def __call__(self, *f_args, **f_kwargs): - return self.func(*f_args, **f_kwargs) - - return Deco - - @staticmethod - def stop(t_id): - timer = Timer.timers.get(t_id, None) - if timer is None: - raise TimerError('Timer id={} not found'.format(t_id)) - Timer.timers[t_id].stopped = True - - @staticmethod - def start(t_id): - timer = Timer.timers.get(t_id, None) - if timer is None: - raise TimerError('Timer id={} not found'.format(t_id)) - Timer.timers[t_id].stopped = False - Timer.timers[t_id].fire_time = None - Timer.timers[t_id].fire_time_prev = None - - @staticmethod - def is_stopped(t_id): - timer = Timer.timers.get(t_id, None) - if timer is None: - raise TimerError('Timer id={} not found'.format(t_id)) - return timer.stopped - - def get_timers(self): - states = {True: 'Stopped', False: 'Running'} - return {k: states[v.stopped] for k, v in self.timers.items()} - - def run(self): - polling_wait(WAIT_SEC) - timers_intervals = [curr_timer.run() for curr_timer in Timer.timers.values() if not curr_timer.stopped] - if not timers_intervals and self.no_timers_err: - raise TimerError('Running timers not found') - return timers_intervals - - -class _Timer(object): - def __init__(self, interval, deco, run_once, stopped, *args, **kwargs): - self.interval = interval - self.deco = deco - self.args = args - self.run_once = run_once - self.kwargs = kwargs - self.fire_time = None - self.fire_time_prev = None - self.stopped = stopped - - def run(self): - timer_real_interval = 0 - if self.fire_time is None: - self.fire_time = time.time() + self.interval - if self.fire_time_prev is None: - self.fire_time_prev = time.time() - curr_time = time.time() - if curr_time >= self.fire_time: - self.deco(*self.args, **self.kwargs) - if self.run_once: - self.stopped = True - timer_real_interval = curr_time - self.fire_time_prev - self.fire_time_prev = self.fire_time - self.fire_time = curr_time + self.interval - return timer_real_interval diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/button.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/button.py deleted file mode 100644 index e7b3087802..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/button.py +++ /dev/null @@ -1,49 +0,0 @@ -""" -Button - -CircuitPython library for Button - MixGoCE -======================================================= -Small Cabbage -20210721 -dahanzimin -20210423 -""" - -import time -import board -from digitalio import DigitalInOut, Direction, Pull - -class Button(): - - def __init__(self, pin): - self.pin = DigitalInOut(pin) - self.pin.direction = Direction.INPUT - self.pin.pull = Pull.UP - self.flag = True - - def is_pressed(self): - return self.pin.value == False - - def get_presses(self, delay=1): - last_time, presses = time.monotonic(), 0 - while time.monotonic()< last_time + delay: - time.sleep(0.05) - if self.was_pressed(): - presses += 1 - return presses - - def was_pressed(self): - if self.pin.value != self.flag: - self.flag = self.pin.value - time.sleep(0.01) - if self.flag == False: - return True - else: - return False - -button_A1 = Button(board.IO14) -button_A2 = Button(board.IO21) -button_A3 = Button(board.IO36) -button_A4 = Button(board.IO37) -button_B1 = Button(board.IO0) -button_B2 = Button(board.IO35) \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/infrared.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/infrared.py deleted file mode 100644 index 508b9dd7f8..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/infrared.py +++ /dev/null @@ -1,41 +0,0 @@ -""" -Infrared Sensor - -CircuitPython library for Infrared Sensor - MixGoCE -======================================================= - -Small Cabbage -20210721 -dahanzimin -20210423 -""" - -from mixgoce import version -import board -from pwmio import PWMOut - -if version:#new - def near(x, val=10000): - from analogio import AnalogIn - - pwm = PWMOut(board.IO39, frequency=50000, duty_cycle=65535) - if x == 'left': - IR=AnalogIn(board.IO3) - reaction = IR.value-10000 #fix - if x == 'right': - IR=AnalogIn(board.IO16) - reaction = IR.value - IR.deinit() - pwm.deinit() - return reaction > val -else:#old - def near(x, val=10000): - from digitalio import DigitalInOut, Direction - - IR = DigitalInOut(board.IO38) - IR.direction = Direction.INPUT - pwm = PWMOut(board.IO39, frequency=53000, duty_cycle=100) - reaction = IR.value - IR.deinit() - pwm.deinit() - return reaction \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/irremote.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/irremote.py deleted file mode 100644 index 30445e3503..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/irremote.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -IR Remote - -CircuitPython library for IR Remote - MixGoCE -======================================================= - -Small Cabbage -20210811 -""" - -import pulseio -import board -import adafruit_irremote - -first_init =True - -def encode2hex(code): - res = "0X" - for c in code: - c = hex(c)[2:] - if len(c) == 1: - c = "0" + c - res = res + c.upper() - return res - -def decode2list(string): - res = [0, 0, 0, 0] - prefix = "0" * (8 - len(string[2:])) - string = prefix + string[2:] - for i in range(4): - res[i] = int(string[2*i:2*(i+1)], 16) - return res - -def ir_receive_hex(pin=board.IO38, flag=False): - global first_init - global decoder - global pulsein - - if first_init : - pulsein = pulseio.PulseIn(pin, maxlen=120, idle_state=True) - decoder = adafruit_irremote.GenericDecode() - first_init=False - - pulses = decoder.read_pulses(pulsein, blocking=False) - if pulses: - try: - # print("Heard", len(pulses), "Pulses:", pulses) - code = decoder.decode_bits(pulses) - res = encode2hex(code) - if flag: - print("Decoded:", res) - # pulsein.deinit() - return res - except adafruit_irremote.IRNECRepeatException: # unusual short code! - print("NEC repeat!") - except adafruit_irremote.IRDecodeException as e: # failed to decode - if flag: - print("Failed to decode: ", e.args) - # except TypeError as e: - # pass - # finally: - # pulsein.deinit() - -def ir_receive(pin=board.IO38,flag=False): - global first_init - global decoder - global pulsein - - if first_init : - pulsein = pulseio.PulseIn(pin, maxlen=120, idle_state=True) - decoder = adafruit_irremote.GenericDecode() - first_init=False - - pulses = decoder.read_pulses(pulsein, blocking=False) - if pulses: - try: - code = decoder.decode_bits(pulses) - if flag: - print("Decoded:", code) - return code - except adafruit_irremote.IRNECRepeatException: # unusual short code! - print("NEC repeat!") - except adafruit_irremote.IRDecodeException as e: # failed to decode - if flag: - print("Failed to decode: ", e.args) - -def ir_send(content=[255, 0, 0, 0], pin=board.IO39): - pulseout = pulseio.PulseOut(pin, frequency=38000, duty_cycle=2 ** 15) - # Create an encoder that will take numbers and turn them into NEC IR pulses - encoder = adafruit_irremote.GenericTransmit( - header=[9500, 4500], one=[550, 550], zero=[550, 1700], trail=550 - ) - encoder.transmit(pulseout, content) - print("IR signal sent!") - pulseout.deinit() - -def ir_send_hex(content="0XFF000000", pin=board.IO39): # [255, 0, 0, 0] - pulseout = pulseio.PulseOut(pin, frequency=38000, duty_cycle=2 ** 15) - # Create an encoder that will take numbers and turn them into NEC IR pulses - encoder = adafruit_irremote.GenericTransmit( - header=[9500, 4500], one=[550, 550], zero=[550, 1700], trail=550 - ) - encoder.transmit(pulseout, decode2list(content)) - print("IR signal sent!") - pulseout.deinit() - - - - - - \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/led.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/led.py deleted file mode 100644 index 33d556a90c..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/led.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -Led - -CircuitPython library for Led - MixGoCE -======================================================= - -Small Cabbage -20210721 -dahanzimin -20210423 -""" -import board -from pwmio import PWMOut - -class Led(object): #fix - def __init__(self, pin): - self.pin = PWMOut(pin, frequency=5000, duty_cycle=65535) - - def setbrightness(self, val): - self.pin.duty_cycle = 65535 - val - - def setonoff(self, val): - if val == 1: - self.pin.duty_cycle = 0 - elif val == 0: - self.pin.duty_cycle = 65535 - else: - self.pin.duty_cycle = 65535 - self.pin.duty_cycle - - def getonoff(self): - return (65535 - self.pin.duty_cycle) // 65535 - -led_L1 = Led(board.IO33) -led_L2 = Led(board.IO34) \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/matcher.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/matcher.py deleted file mode 100644 index 5d641ccbf3..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/matcher.py +++ /dev/null @@ -1,97 +0,0 @@ -# SPDX-FileCopyrightText: 2017 Yoch -# -# SPDX-License-Identifier: EPL-1.0 - -""" -`matcher` -==================================================================================== - -MQTT topic filter matcher from the Eclipse Project's Paho.MQTT.Python -https://github.com/eclipse/paho.mqtt.python/blob/master/src/paho/mqtt/matcher.py -* Author(s): Yoch (https://github.com/yoch) -""" - - -class MQTTMatcher: - """Intended to manage topic filters including wildcards. - - Internally, MQTTMatcher use a prefix tree (trie) to store - values associated with filters, and has an iter_match() - method to iterate efficiently over all filters that match - some topic name. - """ - - # pylint: disable=too-few-public-methods - class Node: - """Individual node on the MQTT prefix tree.""" - - __slots__ = "children", "content" - - def __init__(self): - self.children = {} - self.content = None - - def __init__(self): - self._root = self.Node() - - def __setitem__(self, key, value): - """Add a topic filter :key to the prefix tree - and associate it to :value""" - node = self._root - for sym in key.split("/"): - node = node.children.setdefault(sym, self.Node()) - node.content = value - - def __getitem__(self, key): - """Retrieve the value associated with some topic filter :key""" - try: - node = self._root - for sym in key.split("/"): - node = node.children[sym] - if node.content is None: - raise KeyError(key) - return node.content - except KeyError: - raise KeyError(key) from None - - def __delitem__(self, key): - """Delete the value associated with some topic filter :key""" - lst = [] - try: - parent, node = None, self._root - for k in key.split("/"): - parent, node = node, node.children[k] - lst.append((parent, k, node)) - node.content = None - except KeyError: - raise KeyError(key) from None - else: # cleanup - for parent, k, node in reversed(lst): - if node.children or node.content is not None: - break - del parent.children[k] - - def iter_match(self, topic): - """Return an iterator on all values associated with filters - that match the :topic""" - lst = topic.split("/") - normal = not topic.startswith("$") - - def rec(node, i=0): - if i == len(lst): - if node.content is not None: - yield node.content - else: - part = lst[i] - if part in node.children: - for content in rec(node.children[part], i + 1): - yield content - if "+" in node.children and (normal or i > 0): - for content in rec(node.children["+"], i + 1): - yield content - if "#" in node.children and (normal or i > 0): - content = node.children["#"].content - if content is not None: - yield content - - return rec(self._root) diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/matrix.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/matrix.py deleted file mode 100644 index 5ff56ccb3b..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/matrix.py +++ /dev/null @@ -1,277 +0,0 @@ -# The MIT License (MIT) -# -# Copyright (c) 2016 Radomir Dopieralski & Tony DiCola for Adafruit Industries -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. - -""" -`adafruit_ht16k33.ht16k33` -=========================== - -* Authors: Radomir Dopieralski & Tony DiCola for Adafruit Industries - -""" -from adafruit_bus_device import i2c_device -from micropython import const - -__version__ = "0.0.0-auto.0" -__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_HT16K33.git" - -_HT16K33_BLINK_CMD = const(0x80) -_HT16K33_BLINK_DISPLAYON = const(0x01) -_HT16K33_CMD_BRIGHTNESS = const(0xE0) -_HT16K33_OSCILATOR_ON = const(0x21) - - -class HT16K33: - """ - The base class for all displays. Contains common methods. - - :param int address: The I2C addess of the HT16K33. - :param bool auto_write: True if the display should immediately change when - set. If False, `show` must be called explicitly. - :param float brightness: 0.0 - 1.0 default brightness level. - """ - - def __init__(self, i2c, address=0x70, auto_write=True, brightness=1.0): - self.i2c_device = i2c_device.I2CDevice(i2c, address) - self._temp = bytearray(1) - self._buffer = bytearray(17) - self._auto_write = auto_write - self.fill(0) - self._write_cmd(_HT16K33_OSCILATOR_ON) - self._blink_rate = None - self._brightness = None - self.blink_rate = 0 - self.brightness = brightness - - def _write_cmd(self, byte): - self._temp[0] = byte - with self.i2c_device: - self.i2c_device.write(self._temp) - - @property - def blink_rate(self): - """The blink rate. Range 0-3.""" - return self._blink_rate - - @blink_rate.setter - def blink_rate(self, rate=None): - if not 0 <= rate <= 3: - raise ValueError("Blink rate must be an integer in the range: 0-3") - rate = rate & 0x03 - self._blink_rate = rate - self._write_cmd(_HT16K33_BLINK_CMD | _HT16K33_BLINK_DISPLAYON | rate << 1) - - @property - def brightness(self): - """The brightness. Range 0.0-1.0""" - return self._brightness - - @brightness.setter - def brightness(self, brightness): - if not 0.0 <= brightness <= 1.0: - raise ValueError( - "Brightness must be a decimal number in the range: 0.0-1.0" - ) - - self._brightness = brightness - xbright = round(15 * brightness) - xbright = xbright & 0x0F - self._write_cmd(_HT16K33_CMD_BRIGHTNESS | xbright) - - @property - def auto_write(self): - """Auto write updates to the display.""" - return self._auto_write - - @auto_write.setter - def auto_write(self, auto_write): - if isinstance(auto_write, bool): - self._auto_write = auto_write - else: - raise ValueError("Must set to either True or False.") - - def show(self): - """Refresh the display and show the changes.""" - with self.i2c_device: - # Byte 0 is 0x00, address of LED data register. The remaining 16 - # bytes are the display register data to set. - self.i2c_device.write(self._buffer) - - def fill(self, color): - """Fill the whole display with the given color.""" - fill = 0xFF if color else 0x00 - for i in range(16): - self._buffer[i + 1] = fill - if self._auto_write: - self.show() - - def _pixel(self, x, y, color=None): - addr = 2 * y + x // 8 - mask = 1 << x % 8 - if color is None: - return bool(self._buffer[addr + 1] & mask) - if color: - # set the bit - self._buffer[addr + 1] |= mask - else: - # clear the bit - self._buffer[addr + 1] &= ~mask - if self._auto_write: - self.show() - return None - - def _set_buffer(self, i, value): - self._buffer[i + 1] = value # Offset by 1 to move past register address. - - def _get_buffer(self, i): - return self._buffer[i + 1] # Offset by 1 to move past register address. - -class MatrixBackpack16x8(HT16K33): - """A single matrix.""" - - _columns = 16 - _rows = 8 - - def pixel(self, x, y, color=None): - """Get or set the color of a given pixel.""" - if not 0 <= x <= 15: - return None - if not 0 <= y <= 7: - return None - return super()._pixel(x, y, color) - - def __getitem__(self, key): - x, y = key - return self.pixel(x, y) - - def __setitem__(self, key, value): - x, y = key - self.pixel(x, y, value) - - # pylint: disable=too-many-branches - def shift(self, x, y, rotate=False): - """ - Shift pixels by x and y - - :param rotate: (Optional) Rotate the shifted pixels to the left side (default=False) - """ - auto_write = self.auto_write - self._auto_write = False - if x > 0: # Shift Right - for _ in range(x): - for row in range(0, self.rows): - last_pixel = self[self.columns - 1, row] if rotate else 0 - for col in range(self.columns - 1, 0, -1): - self[col, row] = self[col - 1, row] - self[0, row] = last_pixel - elif x < 0: # Shift Left - for _ in range(-x): - for row in range(0, self.rows): - last_pixel = self[0, row] if rotate else 0 - for col in range(0, self.columns - 1): - self[col, row] = self[col + 1, row] - self[self.columns - 1, row] = last_pixel - if y > 0: # Shift Up - for _ in range(y): - for col in range(0, self.columns): - last_pixel = self[col, self.rows - 1] if rotate else 0 - for row in range(self.rows - 1, 0, -1): - self[col, row] = self[col, row - 1] - self[col, 0] = last_pixel - elif y < 0: # Shift Down - for _ in range(-y): - for col in range(0, self.columns): - last_pixel = self[col, 0] if rotate else 0 - for row in range(0, self.rows - 1): - self[col, row] = self[col, row + 1] - self[col, self.rows - 1] = last_pixel - self._auto_write = auto_write - if auto_write: - self.show() - - # pylint: enable=too-many-branches - - def shift_right(self, rotate=False): - """ - Shift all pixels right - - :param rotate: (Optional) Rotate the shifted pixels to the left side (default=False) - """ - self.shift(1, 0, rotate) - - def shift_left(self, rotate=False): - """ - Shift all pixels left - - :param rotate: (Optional) Rotate the shifted pixels to the right side (default=False) - """ - self.shift(-1, 0, rotate) - - def shift_up(self, rotate=False): - """ - Shift all pixels up - - :param rotate: (Optional) Rotate the shifted pixels to bottom (default=False) - """ - self.shift(0, 1, rotate) - - def shift_down(self, rotate=False): - """ - Shift all pixels down - - :param rotate: (Optional) Rotate the shifted pixels to top (default=False) - """ - self.shift(0, -1, rotate) - - def image(self, img): - """Set buffer to value of Python Imaging Library image. The image should - be in 1 bit mode and a size equal to the display size.""" - imwidth, imheight = img.size - if imwidth != self.columns or imheight != self.rows: - raise ValueError( - "Image must be same dimensions as display ({0}x{1}).".format( - self.columns, self.rows - ) - ) - # Grab all the pixels from the image, faster than getpixel. - pixels = img.convert("1").load() - # Iterate through the pixels - for x in range(self.columns): # yes this double loop is slow, - for y in range(self.rows): # but these displays are small! - self.pixel(x, y, pixels[(x, y)]) - if self._auto_write: - self.show() - - @property - def columns(self): - """Read-only property for number of columns""" - return self._columns - - @property - def rows(self): - """Read-only property for number of rows""" - return self._rows - - - - - - diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mixgoce.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mixgoce.py deleted file mode 100644 index 9042c5c814..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mixgoce.py +++ /dev/null @@ -1,85 +0,0 @@ -# Paste mixgoce.py - -import board -from rtc import RTC -from busio import I2C - -def do_connect(username, password): - import wifi - from time import sleep - - wifi.radio.connect(ssid=username.encode(), password=password.encode()) - while not wifi.radio.ipv4_address: - sleep(0.1) - print("Wi-Fi connected!") - return wifi.radio - -rtc_clock = RTC() - -try: - i2c = I2C(board.IO41, board.IO42,frequency=400000) - while not i2c.try_lock(): - pass - if 0x6a in i2c.scan(): - version = 99 - elif 0x15 in i2c.scan(): - version = 1 - else: - version = 0 - - i2c.unlock() - - if version == 99: - i2c.deinit() - elif version:#new - import mxc6655xa - acc = mxc6655xa.MXC6655XA(i2c) - - def get_temperature(): - return acc.get_temperature() - else:#old - import msa301 - acc = msa301.MSA301(i2c) - -except Exception as e: - print(e) - -ADDITIONAL_TOPIC = 'b640a0ce465fa2a4150c36b305c1c11b' -WILL_TOPIC = '9d634e1a156dc0c1611eb4c3cff57276' - -def ntp(url='http://mixio.mixly.org/time.php'): - import ssl - import wifi - import socketpool - import adafruit_requests - #TEXT_URL = 'http://mixio.mixly.org/time.php' - #TEXT_URL = url - pool = socketpool.SocketPool(wifi.radio) - requests = adafruit_requests.Session(pool, ssl.create_default_context()) - # print("Fetching text from", TEXT_URL) - response = requests.get(url) - # print("-" * 40) - # print(tuple(response.text.split(","))) - # print("-" * 40) - return tuple(response.text.split(",")) - -def analyse_sharekey(url): - import ssl - import wifi - import socketpool - import adafruit_requests - import json - #TEXT_URL = 'http://mixio.mixly.org/time.php' - #TEXT_URL = url - pool = socketpool.SocketPool(wifi.radio) - requests = adafruit_requests.Session(pool, ssl.create_default_context()) - # print("Fetching text from", TEXT_URL) - response = requests.get(url) - # print("-" * 40) - # print(tuple(response.text.split(","))) - # print("-" * 40) - if response.text == '-1': - print('Invalid share key') - else: - result = json.loads(response.text) - return (result['0'], result['1'], result['2']) \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mixpy.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mixpy.py deleted file mode 100644 index 7aa9b9bc38..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mixpy.py +++ /dev/null @@ -1,84 +0,0 @@ -#coding=utf-8 -import math - -def math_map(v, al, ah, bl, bh): - if al==ah: - return bl - if al > ah: - al, ah = ah, al - if v > ah: - v = ah - if v < al: - v = al - return bl + (bh - bl) * (v - al) / (ah - al) - -def math_mean(myList): - localList = [e for e in myList if type(e) == int or type(e) == float] - if not localList: return - return float(sum(localList)) / len(localList) - -def math_median(myList): - localList = sorted([e for e in myList if type(e) == int or type(e) == float]) - if not localList: return - if len(localList) % 2 == 0: - return (localList[len(localList) // 2 - 1] + localList[len(localList) // 2]) / 2.0 - else: - return localList[(len(localList) - 1) // 2] - -def math_modes(some_list): - modes = [] - # Using a lists of [item, count] to keep count rather than dict - # to avoid "unhashable" errors when the counted item is itself a list or dict. - counts = [] - maxCount = 1 - for item in some_list: - found = False - for count in counts: - if count[0] == item: - count[1] += 1 - maxCount = max(maxCount, count[1]) - found = True - if not found: - counts.append([item, 1]) - for counted_item, item_count in counts: - if item_count == maxCount: - modes.append(counted_item) - return modes - -def math_standard_deviation(numbers): - n = len(numbers) - if n == 0: return - mean = float(sum(numbers)) / n - variance = sum((x - mean) ** 2 for x in numbers) / n - return math.sqrt(variance) - -def lists_sort(my_list, type, reverse): - def try_float(s): - try: - return float(s) - except: - return 0 - key_funcs = { - "NUMERIC": try_float, - "TEXT": str, - "IGNORE_CASE": lambda s: str(s).lower() - } - key_func = key_funcs[type] - list_cpy = list(my_list) - return sorted(list_cpy, key=key_func, reverse=reverse) - -def format_content(mydict, cid): - if 'lat' in mydict and 'long' in mydict: - res = '{'+'"lat": "{}", "long": "{}", "clientid": "{}"'.format(mydict.pop('lat'),mydict.pop('long'),cid) - if len(mydict)>0: - res += ', "message": [' - for d in mydict: - res += '{{"label": "{}", "value": "{}"}},'.format(d,mydict[d]) - res = res[:-1] + "]" - res += '}' - return res - else: - print('Invalid Input') - -def format_str(d): - return str(d).replace("'",'"') \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mmatrix.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mmatrix.py deleted file mode 100644 index eb46f4862a..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mmatrix.py +++ /dev/null @@ -1,139 +0,0 @@ -""" -Matrix - -CircuitPython library for Matrix - MixGoCE -======================================================= - -Small Cabbage -20210721 -""" -import time -from mixgoce import i2c -from matrix import MatrixBackpack16x8 -from adafruit_framebuf import FrameBuffer,MVLSB - -buf = bytearray(16) -fb = FrameBuffer(buf, 16, 8, MVLSB) - -class MixGoMatrix(MatrixBackpack16x8): - - def show_dynamic(self, to_show, delay=200): - # self.fill(0) - if type(to_show)==str or type(to_show)==int: - for i in to_show: - fb.fill(0) - fb.text(i, 5, 0, color=1) - # turn all LEDs off - self.fill(0) - for x in range(16): - # using the FrameBuffer text result - bite = buf[x] - for y in range(8): - bit = 1 << y & bite - # if bit > 0 then set the pixel brightness - if bit: - self[x, y] = 1 - self.show() - if len(to_show)>1: - time.sleep(delay/1000) - elif type(to_show)==list or type(to_show)==tuple: - for i in to_show: - #if type(i)!=str and type(i)!=type(Image.HEART): - if type(i)!=str and type(i)!=type(bytearray(16)): - pass - for i in to_show: - self.show_dynamic(i) - time.sleep(delay/1000) - elif type(to_show)==type(bytearray(16)): - buf = to_show - # turn all LEDs off - self.fill(0) - # print(buf) - for x in range(16): - # using the FrameBuffer text result - bite = buf[x] - # print(bite,end=" ") - for y in range(8): - bit = 1 << y & bite - # if bit > 0 then set the pixel brightness - if bit: - self[x, y] = 1 - self.show() - - def scroll(self, text_to_show, delay=0): - for i in range(len(text_to_show) * 6 + 26): - fb.fill(0) - fb.text(text_to_show, -i + 16, 0, color=1) - # turn all LEDs off - self.fill(0) - for x in range(16): - # using the FrameBuffer text result - bite = buf[x] - for y in range(8): - bit = 1 << y & bite - # if bit > 0 then set the pixel brightness - if bit: - self[x, y] = 1 - self.show() - time.sleep(delay/1000) - - def show_static(self, text_to_show): - fb.fill(0) - if len(text_to_show)==2: - l = 3 - elif len(text_to_show)==1: - l = 5 - else: - l = 0 - fb.text(text_to_show, l, 0, color=1) - # turn all LEDs off - self.fill(0) - for x in range(16): - # using the FrameBuffer text result - bite = buf[x] - for y in range(8): - bit = 1 << y & bite - # if bit > 0 then set the pixel brightness - if bit: - self[x, y] = 1 - self.show() - - def set_brightness(self, val): - self.brightness = max(min(val, 1), 0) - - def get_brightness(self): - return self.brightness - - def set_pixel(self, x, y, val): - self[x, y] = val - self.show() - - def get_pixel(self, x, y): - return int(self[x, y]) - - def clear(self): - self.fill(0) - self.show() - - def up(self, times, rotate=False): - for i in range(times): - self.shift_down(rotate) - self.show() - - def down(self, times, rotate=False): - for i in range(times): - self.shift_up(rotate) - self.show() - - def left(self, times, rotate=False): - for i in range(times): - self.shift_left(rotate) - self.show() - - def right(self, times, rotate=False): - for i in range(times): - self.shift_right(rotate) - self.show() - -display = MixGoMatrix(i2c, auto_write=False, brightness=0.1) -display.clear() \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/msa301.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/msa301.py deleted file mode 100644 index 141f5e15b0..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/msa301.py +++ /dev/null @@ -1,70 +0,0 @@ -""" -MSA301 - -CircuitPython library for the MSA301 Accelerometer -======================================================= - -dahanzimin -20210411 -mixly -""" -import time -import adafruit_bus_device.i2c_device as i2c_device -from micropython import const - -MSA301_ADDRESS = const(0x26) - -MSA301_REG_DEVICE_ID = const(0x01) -MSA301_REG_DATA = const(0x02) -MSA301_REG_ODR = const(0x10) -MSA301_REG_POWERMODE = const(0x11) -MSA301_REG_RESRANGE = const(0x0F) - - -_STANDARD_GRAVITY = 9.806 - -class MSA301: - - _BUFFER = bytearray(2) - data_reg = bytearray(6) - - def __init__(self, i2c_bus): - self._device = i2c_device.I2CDevice(i2c_bus, MSA301_ADDRESS) - if self._chip_id() != 0x13: - raise AttributeError("Cannot find a MSA301") - - self._write_u8(MSA301_REG_ODR,0X09) #RATE_500_HZ - self._write_u8(MSA301_REG_POWERMODE,0X12) #NORMAL & WIDTH_250_HZ - self._write_u8(MSA301_REG_RESRANGE,0X02) #RESOLUTION_14_BIT & RANGE_8_G - - def _read_u8(self, address): - # Read an 8-bit unsigned value from the specified 8-bit address. - with self._device: - self._BUFFER[0] = address & 0xFF - self._device.write(self._BUFFER, end=1) - self._device.readinto(self._BUFFER, end=1) - return self._BUFFER[0] - - def _write_u8(self, address, val): - # Write an 8-bit unsigned value to the specified 8-bit address. - with self._device: - self._BUFFER[0] = address & 0xFF - self._BUFFER[1] = val & 0xFF - self._device.write(self._BUFFER, end=2) - - def _chip_id(self): - return self._read_u8(MSA301_REG_DEVICE_ID) - - def u2s(self,n): - return n if n < (1 << 7) else n - (1 << 8) - - @property - def acceleration(self): - for i in range(0,6): - self.data_reg[i]=self._read_u8(MSA301_REG_DATA+i) - x_acc=((self.u2s(self.data_reg[1])<<8|self.data_reg[0])>>2)/1024.0 - y_acc=((self.u2s(self.data_reg[3])<<8|self.data_reg[2])>>2)/1024.0 - z_acc=((self.u2s(self.data_reg[5])<<8|self.data_reg[4])>>2)/1024.0 - return (-y_acc,-x_acc,z_acc) - - diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/music.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/music.py deleted file mode 100644 index fbf7f1a4ea..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/music.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -Music - -CircuitPython library for Music - MixGoCE -======================================================= - -Small Cabbage -20210721 -""" - - -import time -from mixgoce import version - -class Music: - def __init__(self, pin, duration='4', octave='6', tempo='63'): - import pwmio - self.pin = pin - self.pwm = pwmio.PWMOut(pin, duty_cycle=0, variable_frequency=True) - self.duration = duration - self.octave = octave - self.tempo = tempo - - def set_duration(self, duration='4'): - self.duration = duration - - def set_octave(self, octave='6'): - self.octave = octave - - def sef_tempo(self, tempo='63'): - self.tempo = tempo - - def set_duration_tempo(self, duration, tempo):#这项等于原来的set_tempo - self.duration = duration - self.tempo = tempo - - def reset(self): - self.set_duration() - self.set_octave() - self.set_tempo() - - def get_duration(self): - return self.duration - - def get_octave(self): - return self.octave - - def get_tempo(self): - return (self.tempo) - - def play_demo(self, demo): - import adafruit_rtttl - if self.pwm: - self.pwm.deinit() - adafruit_rtttl.play(self.pin, demo) - - def play_demo_print(self, demo): - import adafruit_rtttl - if self.pwm: - self.pwm.deinit() - adafruit_rtttl.play(self.pin, demo) - _, defaults, tune = demo.lower().split(":") - print(tune) - - def play(self, note, duration=None): - import pwmio - if self.pwm: - self.pwm.deinit() - self.pwm = pwmio.PWMOut(self.pin, duty_cycle=0, variable_frequency=True) - self.pwm.frequency = int(note) - self.pwm.duty_cycle = 2 ** 15 - if duration: - # print(1) - time.sleep(duration/1000) - self.pwm.duty_cycle = 0 - - def stop(self): - self.pwm.duty_cycle = 0 - self.pwm.deinit() - -BA_DING = "itchy:d=4,o=4,b=300:4b5,2g5" -JUMP_UP = "itchy:d=4,o=5,b=300:c,d,e,f,g" -JUMP_DOWN = "itchy:d=4,o=5,b=300:g,f,e,d,c" -POWER_UP = "itchy:d=4,o=4,b=300:4g4,4c5,4e5,2g5,4e5,1g5" -POWER_DOWN = "itchy:d=4,o=4,b=300:4g5,4d#5,4c5,2g4,4b4,1c5" -DADADADUM = "itchy:d=4,o=4,b=200:2p,g,g,g,1d#,p,f,f,f,1d" -#ENTERTAINER = "itchy:d=4,o=4,b=300:4d4,d#4,e4,2c5,4e4,2c5,4e4,1c5,c5,d5,d#5,e5,c5,d5,2e5,b4,2d5,1c5" -BIRTHDAY = "itchy:d=4,o=4,b=180:6c,12c,4d,4c,4f,2e,6c,12c,4d,4c,4g,2f,6c,12c,4c5,4a,4f,4e,4d,6a#,12a#,4a,4f,4g,2f" -#BLUES = "itchy:d=4,o=4,b=400:2c,4e,4g,4a,4a#,4a,4g,4e,2c,4e,4g,4a,4a#,4a,4g,4e,4f,4a,2c,4d,4d#,4d,4c,4a,2c,4e,4g,4a,4a#,4a,4g,4e,4g,4b,4d,4f,4f,4a,4c,4d#,2c,e,g,e,g,f,e,d" -#PYTHON = "itchy:d=4,o=4,b=380:d5,4b4,p,b4,b4,a#4,b4,g5,p,d5,d5,r,b4,c5,p,c5,c5,p,d5,1e5,p,c5,a4,p,a4,a4,g#4,a4,f#5,p,e5,e5,p,c5,b4,p,b4,b4,p,c5,1d5,p,d5,4b4,p,b4,b4,a#4,b4,b5,p,g5,g5,p,d5,c#5,p,a5,a5,p,a5,1a5,p,g5,f#5,p,a5,a5,g#5,a5,e5,p,a5,a5,g#5,a5,d5,p,c#5,d5,p,c#5,2d5,3p" - -import board - -if version:#new - buzzer = Music(board.IO40) -else:#old - from digitalio import DigitalInOut, Direction - - buzzer = Music(board.IO17) - spk_en = DigitalInOut(board.IO40) - spk_en.direction = Direction.OUTPUT - spk_en.value = True \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mxc6655xa.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mxc6655xa.py deleted file mode 100644 index 0a5c83dbe0..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/mxc6655xa.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -MXC6655XA - -CircuitPython library for the MXC6655XA Accelerometer -======================================================= - -dahanzimin -20210411 -mixly -""" -import time -import adafruit_bus_device.i2c_device as i2c_device -from micropython import const - -MXC6655XA_ADDRESS = const(0x15) - -MXC6655XA_REG_DATA = const(0x03) -MXC6655XA_REG_CTRL = const(0x0D) -MXC6655XA_REG_DEVICE_ID = const(0x0E) - -MXC6655XA_CMD_8G_POWER_ON = const(0x40) -MXC6655XA_CMD_4G_POWER_ON = const(0x20) -MXC6655XA_CMD_2G_POWER_ON = const(0x00) - -MXC6655XA_2G_SENSITIVITY =1024 -MXC6655XA_4G_SENSITIVITY =512 -MXC6655XA_8G_SENSITIVITY =256 -MXC6655XA_T_ZERO =25 -MXC6655XA_T_SENSITIVITY =0.586 - -class MXC6655XA: - - _BUFFER = bytearray(2) - data_reg = bytearray(7) - - def __init__(self, i2c_bus): - self._device = i2c_device.I2CDevice(i2c_bus, MXC6655XA_ADDRESS) - if self._chip_id() != 0x02: - raise AttributeError("Cannot find a MXC6655XA") - self._Enable() #star - time.sleep(0.3) - self.range = MXC6655XA_8G_SENSITIVITY #SET 8g range - - def _read_u8(self, address): - # Read an 8-bit unsigned value from the specified 8-bit address. - with self._device: - self._BUFFER[0] = address & 0xFF - self._device.write(self._BUFFER, end=1) - self._device.readinto(self._BUFFER, end=1) - return self._BUFFER[0] - - def _write_u8(self, address, val): - # Write an 8-bit unsigned value to the specified 8-bit address. - with self._device: - self._BUFFER[0] = address & 0xFF - self._BUFFER[1] = val & 0xFF - self._device.write(self._BUFFER, end=2) - - def _chip_id(self): - return self._read_u8(MXC6655XA_REG_DEVICE_ID) - - def _Enable(self): - self._write_u8(MXC6655XA_REG_CTRL,MXC6655XA_CMD_8G_POWER_ON) - - def u2s(self,n): - return n if n < (1 << 7) else n - (1 << 8) - - @property - def acceleration(self): - for i in range(0,6): - self.data_reg[i]=self._read_u8(MXC6655XA_REG_DATA+i) - x_acc=float((self.u2s(self.data_reg[0])<<8|self.data_reg[1])>>4)/self.range - y_acc=float((self.u2s(self.data_reg[2])<<8|self.data_reg[3])>>4)/self.range - z_acc=float((self.u2s(self.data_reg[4])<<8|self.data_reg[5])>>4)/self.range - #t_acc=float(self.u2s(self.data_reg[6]))*MXC6655XA_T_SENSITIVITY + MXC6655XA_T_ZERO - return (-y_acc,-x_acc,z_acc) - - def get_temperature(self): - t_acc=float(self.u2s(self._read_u8(MXC6655XA_REG_DATA+6)))*MXC6655XA_T_SENSITIVITY + MXC6655XA_T_ZERO - return round(t_acc,1) diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/pixels.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/pixels.py deleted file mode 100644 index f51055e510..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/pixels.py +++ /dev/null @@ -1,71 +0,0 @@ -""" -RGB - -CircuitPython library for RGB - MixGoCE -======================================================= - -Small Cabbage -20210721 -""" - - -import time -import board -import neopixel - -def wheel(pos): - # Input a value 0 to 255 to get a color value. - # The colours are a transition r - g - b - back to r. - if pos < 0 or pos > 255: - return (0, 0, 0) - if pos < 85: - return (255 - pos * 3, pos * 3, 0) - if pos < 170: - pos -= 85 - return (0, 255 - pos * 3, pos * 3) - pos -= 170 - return (pos * 3, 0, 255 - pos * 3) - - -class RGB(object): - def __init__(self,pin=board.IO8,num=4,flag="RGB"): - self.pin = pin - self.num = num - self.pixels = neopixel.NeoPixel(pin, num, brightness=0.3, auto_write=False, pixel_order = flag) - - def show_all(self, R, G, B): - color = (R, G, B) - self.pixels.fill(color) - - def show_one(self, index, R, G, B): - color = (R, G, B) - self.pixels[index - 1] = color - - def write(self): - self.pixels.show() - - def color_chase(self, R, G, B, wait): - color = (R, G, B) - for i in range(self.num): - self.pixels[i] = color - time.sleep(wait/1000) - self.pixels.show() - - def rainbow_cycle(self, wait): - for j in range(255): - for i in range(self.num): - rc_index = (i * 256 // self.num) + j - self.pixels[i] = wheel(rc_index & 255) - self.pixels.show() - time.sleep(wait/1000/256) - - def change_mod(self,flag): - import time - if flag in ("RGB","GRB"): - self.pixels.deinit() - time.sleep(0.1) - self.pixels = neopixel.NeoPixel(self.pin, self.num, brightness=0.3, auto_write=False, pixel_order = flag) - -rgb = RGB() -rgb.show_all(0,0,0) -rgb.write() \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/sensor.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/sensor.py deleted file mode 100644 index 56d6465201..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/sensor.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -Analog Sensor - -CircuitPython library for Analog Sensor - MixGoCE -======================================================= - -Small Cabbage -20210721 -dahanzimin -20210423 -""" -from mixgoce import version - -class ADCSensor: - import board - __pins=[board.IO13,board.IO15,board.IO16] - __species = {} - __first_init = True - def __new__(cls, pin, *args, **kwargs): - if pin not in cls.__species.keys(): - cls.__first_init = True - cls.__species[pin]=object.__new__(cls) - return cls.__species[pin] - - def __init__(self,pin): - from analogio import AnalogIn - if self.__first_init: - self.__first_init = False - self.pin = AnalogIn(self.__pins[pin]) - - def read(self): - return self.pin.value - -def get_brightness(): - return ADCSensor(1).read() - -def get_soundlevel(): #fix - value_d= [] - for _ in range(5): - values = [] - for _ in range(5): - val = ADCSensor(0).read() - values.append(val) - value_d.append(max(values) - min(values)) - return max(value_d) - - -if version:#new - from mixgoce import acc - def get_temperature(): - return acc.get_temperature() -else:#old - def get_temperature(): - adc_val = ADCSensor(2).read() - return adc_val * 3.9 / 5900 \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/touchpad.py b/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/touchpad.py deleted file mode 100644 index 1aa9b78c6b..0000000000 --- a/ports/espressif/boards/mixgo_ce_udisk/mixgoce_lib/touchpad.py +++ /dev/null @@ -1,25 +0,0 @@ -""" -TouchPad - -CircuitPython library for TouchPad - MixGoCE -======================================================= - -Small Cabbage -20210721 -dahanzimin -20210423 -""" -import board -from touchio import TouchIn -class TouchPad(): - def __init__(self,pin,v=18000): - self.pin = TouchIn(pin) - self.v = v - - def is_touched(self): - return self.pin.raw_value > self.v - -touch_T1 = TouchPad(board.IO4) -touch_T2 = TouchPad(board.IO5) -touch_T3 = TouchPad(board.IO6) -touch_T4 = TouchPad(board.IO7) \ No newline at end of file diff --git a/ports/espressif/boards/mixgo_ce_udisk/mpconfigboard.mk b/ports/espressif/boards/mixgo_ce_udisk/mpconfigboard.mk index dbd55bc52a..99c623985e 100644 --- a/ports/espressif/boards/mixgo_ce_udisk/mpconfigboard.mk +++ b/ports/espressif/boards/mixgo_ce_udisk/mpconfigboard.mk @@ -20,5 +20,5 @@ CIRCUITPY_MODULE=wroom FROZEN_MPY_DIRS += $(TOP)/frozen/Adafruit_CircuitPython_Requests FROZEN_MPY_DIRS += $(TOP)/frozen/Adafruit_CircuitPython_NeoPixel -FROZEN_MPY_DIRS += boards/$(BOARD)/mixgoce_lib +FROZEN_MPY_DIRS += boards/$(BOARD)/cp_lib/mixgoce_lib