Merge pull request #6085 from rimwolf-redux/msgpack

changed msgpack to use ByteStream type annotations
This commit is contained in:
Dan Halbert 2022-03-01 10:17:10 -05:00 committed by GitHub
commit 548d6beb4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 60 additions and 143 deletions

View File

@ -24,3 +24,6 @@ intelhex
# for building & testing natmods
pyelftools
# for stubs and annotations
adafruit-circuitpython-typing

View File

@ -15,3 +15,6 @@ sphinx-rtd-theme
sphinxcontrib-svg2pdfconverter
readthedocs-sphinx-search
myst-parser
# For stubs and annotations
adafruit-circuitpython-typing

View File

@ -64,7 +64,7 @@
//| This object is the sole instance of `alarm.SleepMemory`."""
//|
//| wake_alarm: Optional[Alarm]
//| wake_alarm: Optional[circuitpython_typing.Alarm]
//| """The most recently triggered alarm. If CircuitPython was sleeping, the alarm that woke it from sleep.
//| If no alarm occured since the last hard reset or soft restart, value is ``None``.
//| """
@ -83,7 +83,7 @@ STATIC void validate_objs_are_alarms(size_t n_args, const mp_obj_t *objs) {
}
}
//| def light_sleep_until_alarms(*alarms: Alarm) -> Alarm:
//| def light_sleep_until_alarms(*alarms: circuitpython_typing.Alarm) -> circuitpython_typing.Alarm:
//| """Go into a light sleep until awakened one of the alarms. The alarm causing the wake-up
//| is returned, and is also available as `alarm.wake_alarm`.
//|
@ -111,7 +111,7 @@ STATIC mp_obj_t alarm_light_sleep_until_alarms(size_t n_args, const mp_obj_t *ar
}
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(alarm_light_sleep_until_alarms_obj, 1, MP_OBJ_FUN_ARGS_MAX, alarm_light_sleep_until_alarms);
//| def exit_and_deep_sleep_until_alarms(*alarms: Alarm) -> None:
//| def exit_and_deep_sleep_until_alarms(*alarms: circuitpython_typing.Alarm) -> None:
//| """Exit the program and go into a deep sleep, until awakened by one of the alarms.
//| This function does not return.
//|

View File

@ -1,69 +0,0 @@
"""Types for the C-level protocols"""
from typing import Union
import alarm
import alarm.pin
import alarm.time
import array
import audiocore
import audiomixer
import audiomp3
import rgbmatrix
import ulab.numpy
ReadableBuffer = Union[
bytes, bytearray, memoryview, array.array, ulab.numpy.ndarray, rgbmatrix.RGBMatrix
]
"""Classes that implement the readable buffer protocol
- `bytes`
- `bytearray`
- `memoryview`
- `array.array`
- `ulab.numpy.ndarray`
- `rgbmatrix.RGBMatrix`
"""
WriteableBuffer = Union[
bytearray, memoryview, array.array, ulab.numpy.ndarray, rgbmatrix.RGBMatrix
]
"""Classes that implement the writeable buffer protocol
- `bytearray`
- `memoryview`
- `array.array`
- `ulab.numpy.ndarray`
- `rgbmatrix.RGBMatrix`
"""
AudioSample = Union[
audiocore.WaveFile, audiocore.RawSample, audiomixer.Mixer, audiomp3.MP3Decoder, synthio.MidiTrack
]
"""Classes that implement the audiosample protocol
- `audiocore.WaveFile`
- `audiocore.RawSample`
- `audiomixer.Mixer`
- `audiomp3.MP3Decoder`
- `synthio.MidiTrack`
You can play these back with `audioio.AudioOut`, `audiobusio.I2SOut` or `audiopwmio.PWMAudioOut`.
"""
FrameBuffer = Union[rgbmatrix.RGBMatrix]
"""Classes that implement the framebuffer protocol
- `rgbmatrix.RGBMatrix`
"""
Alarm = Union[
alarm.pin.PinAlarm, alarm.time.TimeAlarm
]
"""Classes that implement alarms for sleeping and asynchronous notification.
- `alarm.pin.PinAlarm`
- `alarm.time.TimeAlarm`
You can use these alarms to wake up from light or deep sleep.
"""

View File

@ -83,11 +83,11 @@
//| """
//|
//| def pack(obj: object, buffer: WriteableBuffer, *, default: Union[Callable[[object], None], None] = None) -> None:
//| """Ouput object to buffer in msgpack format.
//| def pack(obj: object, stream: circuitpython_typing.ByteStream, *, default: Union[Callable[[object], None], None] = None) -> None:
//| """Output object to stream in msgpack format.
//|
//| :param object obj: Object to convert to msgpack format.
//| :param ~circuitpython_typing.WriteableBuffer buffer: buffer to write into
//| :param ~circuitpython_typing.ByteStream stream: stream to write to
//| :param Optional[~circuitpython_typing.Callable[[object], None]] default:
//| function called for python objects that do not have
//| a representation in msgpack format.
@ -98,7 +98,7 @@ STATIC mp_obj_t mod_msgpack_pack(size_t n_args, const mp_obj_t *pos_args, mp_map
enum { ARG_obj, ARG_buffer, ARG_default };
STATIC const mp_arg_t allowed_args[] = {
{ MP_QSTR_obj, MP_ARG_REQUIRED | MP_ARG_OBJ },
{ MP_QSTR_buffer, MP_ARG_REQUIRED | MP_ARG_OBJ },
{ MP_QSTR_stream, MP_ARG_REQUIRED | MP_ARG_OBJ },
{ MP_QSTR_default, MP_ARG_KW_ONLY | MP_ARG_OBJ, { .u_obj = mp_const_none } },
};
mp_arg_val_t args[MP_ARRAY_SIZE(allowed_args)];
@ -115,22 +115,22 @@ STATIC mp_obj_t mod_msgpack_pack(size_t n_args, const mp_obj_t *pos_args, mp_map
MP_DEFINE_CONST_FUN_OBJ_KW(mod_msgpack_pack_obj, 0, mod_msgpack_pack);
//| def unpack(buffer: ReadableBuffer, *, ext_hook: Union[Callable[[int, bytes], object], None] = None, use_list: bool=True) -> object:
//| """Unpack and return one object from buffer.
//| def unpack(stream: circuitpython_typing.ByteStream, *, ext_hook: Union[Callable[[int, bytes], object], None] = None, use_list: bool=True) -> object:
//| """Unpack and return one object from stream.
//|
//| :param ~circuitpython_typing.ReadableBuffer buffer: buffer to read from
//| :param ~circuitpython_typing.ByteStream stream: stream to read from
//| :param Optional[~circuitpython_typing.Callable[[int, bytes], object]] ext_hook: function called for objects in
//| msgpack ext format.
//| :param Optional[bool] use_list: return array as list or tuple (use_list=False).
//|
//| :return object: object read from buffer.
//| :return object: object read from stream.
//| """
//| ...
//|
STATIC mp_obj_t mod_msgpack_unpack(size_t n_args, const mp_obj_t *pos_args, mp_map_t *kw_args) {
enum { ARG_buffer, ARG_ext_hook, ARG_use_list };
STATIC const mp_arg_t allowed_args[] = {
{ MP_QSTR_buffer, MP_ARG_REQUIRED | MP_ARG_OBJ, },
{ MP_QSTR_stream, MP_ARG_REQUIRED | MP_ARG_OBJ, },
{ MP_QSTR_ext_hook, MP_ARG_KW_ONLY | MP_ARG_OBJ, { .u_obj = mp_const_none } },
{ MP_QSTR_use_list, MP_ARG_KW_ONLY | MP_ARG_BOOL, { .u_bool = true } },
};

View File

@ -11,53 +11,45 @@ import os
import re
import sys
import traceback
import types
import typing
import isort
import black
import circuitpython_typing
import circuitpython_typing.socket
IMPORTS_IGNORE = frozenset(
{
"int",
"float",
"bool",
"str",
"bytes",
"tuple",
"list",
"set",
"dict",
"bytearray",
"slice",
"file",
"buffer",
"range",
"array",
"bool",
"buffer",
"bytearray",
"bytes",
"dict",
"file",
"float",
"int",
"list",
"range",
"set",
"slice",
"str",
"struct_time",
"tuple",
}
)
IMPORTS_TYPING = frozenset(
{
"Any",
"Dict",
"Optional",
"Union",
"Tuple",
"List",
"Sequence",
"NamedTuple",
"Iterable",
"Iterator",
"Callable",
"AnyStr",
"overload",
"Type",
# Include all definitions in these type modules, minus some name conflicts.
AVAILABLE_TYPE_MODULE_IMPORTS = {
"types": frozenset(types.__all__),
# Conflicts: countio.Counter, canio.Match
"typing": frozenset(typing.__all__) - set(["Counter", "Match"]),
"circuitpython_typing": frozenset(circuitpython_typing.__all__),
"circuitpython_typing.socket": frozenset(circuitpython_typing.socket.__all__),
}
)
IMPORTS_TYPES = frozenset({"TracebackType"})
CPY_TYPING = frozenset(
{"ReadableBuffer", "WriteableBuffer", "AudioSample", "FrameBuffer", "Alarm"}
)
def is_typed(node, allow_any=False):
@ -116,9 +108,7 @@ def find_stub_issues(tree):
def extract_imports(tree):
modules = set()
typing = set()
types = set()
cpy_typing = set()
used_type_module_imports = {module: set() for module in AVAILABLE_TYPE_MODULE_IMPORTS.keys()}
def collect_annotations(anno_tree):
if anno_tree is None:
@ -127,12 +117,9 @@ def extract_imports(tree):
if isinstance(node, ast.Name):
if node.id in IMPORTS_IGNORE:
continue
elif node.id in IMPORTS_TYPING:
typing.add(node.id)
elif node.id in IMPORTS_TYPES:
types.add(node.id)
elif node.id in CPY_TYPING:
cpy_typing.add(node.id)
for module, imports in AVAILABLE_TYPE_MODULE_IMPORTS.items():
if node.id in imports:
used_type_module_imports[module].add(node.id)
elif isinstance(node, ast.Attribute):
if isinstance(node.value, ast.Name):
modules.add(node.value.id)
@ -145,15 +132,12 @@ def extract_imports(tree):
elif isinstance(node, ast.FunctionDef):
collect_annotations(node.returns)
for deco in node.decorator_list:
if isinstance(deco, ast.Name) and (deco.id in IMPORTS_TYPING):
typing.add(deco.id)
if isinstance(deco, ast.Name) and (
deco.id in AVAILABLE_TYPE_MODULE_IMPORTS["typing"]
):
used_type_module_imports["typing"].add(deco.id)
return {
"modules": sorted(modules),
"typing": sorted(typing),
"types": sorted(types),
"cpy_typing": sorted(cpy_typing),
}
return (modules, used_type_module_imports)
def find_references(tree):
@ -237,15 +221,11 @@ def convert_folder(top_level, stub_directory):
ok += 1
# Add import statements
imports = extract_imports(tree)
imports, type_imports = extract_imports(tree)
import_lines = ["from __future__ import annotations"]
if imports["types"]:
import_lines.append("from types import " + ", ".join(imports["types"]))
if imports["typing"]:
import_lines.append("from typing import " + ", ".join(imports["typing"]))
if imports["cpy_typing"]:
import_lines.append("from circuitpython_typing import " + ", ".join(imports["cpy_typing"]))
import_lines.extend(f"import {m}" for m in imports["modules"])
for type_module, used_types in type_imports.items():
import_lines.append(f"from {type_module} import {', '.join(sorted(used_types))}")
import_lines.extend(f"import {m}" for m in sorted(imports))
import_body = "\n".join(import_lines)
m = re.match(r'(\s*""".*?""")', stub_contents, flags=re.DOTALL)
if m:

View File

@ -2,7 +2,7 @@
rm -rf test-stubs
python3 -mvenv test-stubs
. test-stubs/bin/activate
pip install mypy isort black wheel
pip install mypy isort black adafruit-circuitpython-typing wheel
rm -rf circuitpython-stubs .mypy_cache
make stubs
pip install --force-reinstall circuitpython-stubs/dist/circuitpython-stubs-*.tar.gz