Rework the analogbufio API.
* read() is now readinto() and takes the buffer to write into. * readinto() returns the number of valid samples. * readinto() can be interrupted by ctrl-c. * readinto() API doesn't support signed numbers because it never did. * sample_rate is now required in the constructor because supported values will vary per-port. * 16 bit values are full range. 12 bit samples from RP2040 are stretched in the same way they are for AnalogIn. Fixes #7226
This commit is contained in:
@ -159,6 +159,9 @@ msgid "%q must be >= %d"
msgstr ""
#: shared-bindings/analogbufio/BufferedIn.c
msgid "%q must be a bytearray or array of type 'H' or 'B'"
msgstr ""
#: shared-bindings/audiocore/RawSample.c
msgid "%q must be a bytearray or array of type 'h', 'H', 'b', or 'B'"
msgstr ""
@ -33,6 +33,7 @@
#include "common-hal/analogbufio/BufferedIn.h"
#include "shared-bindings/analogbufio/BufferedIn.h"
#include "shared-bindings/microcontroller/Pin.h"
#include "shared/runtime/interrupt_char.h"
#include "py/runtime.h"
#include "supervisor/shared/translate/translate.h"
#include "src/rp2_common/hardware_adc/include/hardware/adc.h"
@ -42,13 +43,18 @@
#define ADC_PIN_COUNT 4
void common_hal_analogbufio_bufferedin_construct(analogbufio_bufferedin_obj_t *self, const mcu_pin_obj_t *pin, uint8_t *buffer, uint32_t len, uint8_t bytes_per_sample, bool samples_signed, uint32_t sample_rate) {
#define ADC_CLOCK_INPUT 48000000
void common_hal_analogbufio_bufferedin_construct(analogbufio_bufferedin_obj_t *self, const mcu_pin_obj_t *pin, uint32_t sample_rate) {
// Make sure pin number is in range for ADC
if (pin->number < ADC_FIRST_PIN_NUMBER || pin->number >= (ADC_FIRST_PIN_NUMBER + ADC_PIN_COUNT)) {
// Validate sample rate here
sample_rate = (uint32_t)mp_arg_validate_int_range(sample_rate, ADC_CLOCK_INPUT / ADC_MAX_CLOCK_DIV, ADC_CLOCK_INPUT / 96, MP_QSTR_sample_rate);
// Set pin and channel
self->pin = pin;
@ -56,45 +62,13 @@ void common_hal_analogbufio_bufferedin_construct(analogbufio_bufferedin_obj_t *s
// TODO: find a way to accept ADC4 for temperature
self->chan = pin->number - ADC_FIRST_PIN_NUMBER;
// Set buffer and length
self->buffer = buffer;
self->len = len;
// Set sample rate - used in read
self->bytes_per_sample = bytes_per_sample;
self->sample_rate = sample_rate;
// Init GPIO for analogue use: hi-Z, no pulls, disable digital input buffer.
// TODO: Make sure we share the ADC well. Right now we just assume it is
// unused.
adc_select_input(self->chan); // chan = pin - 26 ??
// RP2040 Implementation Detail
// Fills the supplied buffer with ADC values using DMA transfer.
// If the buffer is 8-bit, then values are 8-bit shifted and error bit is off.
// If buffer is 16-bit, then values are not shifted and error bit is present.
// Number of transfers is always the number of samples which is the array
// byte length divided by the bytes_per_sample.
// self->bytes_per_sample == 1
uint dma_size = DMA_SIZE_8;
bool show_error_bit = false;
bool shift_sample_8_bits = true;
if (self->bytes_per_sample == 2) {
dma_size = DMA_SIZE_16;
show_error_bit = true;
shift_sample_8_bits = false;
// adc_select_input(self->pin->number - ADC_FIRST_PIN_NUMBER);
true, // Write each completed conversion to the sample FIFO
true, // Enable DMA data request (DREQ)
1, // DREQ (and IRQ) asserted when at least 1 sample present
show_error_bit, // See the ERR bit
shift_sample_8_bits // Shift each sample to 8 bits when pushing to FIFO
// Divisor of 0 -> full speed. Free-running capture with the divider is
// equivalent to pressing the ADC_CS_START_ONCE button once per `div + 1`
// cycles (div not necessarily an integer). Each conversion takes 96
@ -104,7 +78,9 @@ void common_hal_analogbufio_bufferedin_construct(analogbufio_bufferedin_obj_t *s
// sample rate determines divisor, not zero.
// sample_rate is forced to be >= 1 in shared-bindings
adc_set_clkdiv((float)48000000.0 / (float)self->sample_rate);
float clk_div = (float)ADC_CLOCK_INPUT / (float)sample_rate;
mp_printf(&mp_plat_print, "clk_div %f for %d\n", (double)clk_div, sample_rate);
// Set up the DMA to start transferring data as soon as it appears in FIFO
uint dma_chan = dma_claim_unused_channel(true);
@ -114,7 +90,6 @@ void common_hal_analogbufio_bufferedin_construct(analogbufio_bufferedin_obj_t *s
self->cfg = dma_channel_get_default_config(dma_chan);
// Reading from constant address, writing to incrementing byte addresses
channel_config_set_transfer_data_size(&(self->cfg), dma_size);
channel_config_set_read_increment(&(self->cfg), false);
channel_config_set_write_increment(&(self->cfg), true);
@ -143,14 +118,36 @@ void common_hal_analogbufio_bufferedin_deinit(analogbufio_bufferedin_obj_t *self
void common_hal_analogbufio_bufferedin_read(analogbufio_bufferedin_obj_t *self) {
uint32_t common_hal_analogbufio_bufferedin_readinto(analogbufio_bufferedin_obj_t *self, uint8_t *buffer, uint32_t len, uint8_t bytes_per_sample) {
// RP2040 Implementation Detail
// Fills the supplied buffer with ADC values using DMA transfer.
// If the buffer is 8-bit, then values are 8-bit shifted and error bit is off.
// If buffer is 16-bit, then values are not shifted and error bit is present.
// Number of transfers is always the number of samples which is the array
// byte length divided by the bytes_per_sample.
uint dma_size = DMA_SIZE_8;
bool show_error_bit = false;
if (bytes_per_sample == 2) {
dma_size = DMA_SIZE_16;
show_error_bit = true;
uint32_t cdl = self->len / self->bytes_per_sample;
true, // Write each completed conversion to the sample FIFO
true, // Enable DMA data request (DREQ)
1, // DREQ (and IRQ) asserted when at least 1 sample present
show_error_bit, // See the ERR bit
bytes_per_sample == 1 // Shift each sample to 8 bits when pushing to FIFO
uint32_t sample_count = len / bytes_per_sample;
channel_config_set_transfer_data_size(&(self->cfg), dma_size);
dma_channel_configure(self->dma_chan, &(self->cfg),
self->buffer, // dst
buffer, // dst
&adc_hw->fifo, // src
cdl, // transfer count
sample_count, // transfer count
true // start immediately
@ -159,9 +156,34 @@ void common_hal_analogbufio_bufferedin_read(analogbufio_bufferedin_obj_t *self)
// Once DMA finishes, stop any new conversions from starting, and clean up
// the FIFO in case the ADC was still mid-conversion.
uint32_t remaining_transfers = sample_count;
while (dma_channel_is_busy(self->dma_chan) &&
!mp_hal_is_interrupted()) {
remaining_transfers = dma_channel_hw_addr(self->dma_chan)->transfer_count;
// Clean up
// Stopping early so abort.
if (dma_channel_is_busy(self->dma_chan)) {
size_t captured_count = sample_count - remaining_transfers;
if (dma_size == DMA_SIZE_16) {
uint16_t *buf16 = (uint16_t *)buffer;
for (size_t i = 0; i < captured_count; i++) {
uint16_t value = buf16[i];
// Check the error bit and "truncate" the buffer if there is an error.
if ((value & ADC_FIFO_ERR_BITS) != 0) {
captured_count = i;
// Scale the values to the standard 16 bit range.
buf16[i] = (value << 4) | (value >> 8);
return captured_count;
@ -41,16 +41,9 @@
typedef struct {
mp_obj_base_t base;
const mcu_pin_obj_t *pin;
uint8_t *buffer;
uint32_t len;
uint8_t bytes_per_sample;
bool samples_signed;
uint32_t sample_rate;
uint8_t chan;
uint dma_chan;
dma_channel_config cfg;
} analogbufio_bufferedin_obj_t;
void bufferedin_init(void);
@ -49,8 +49,8 @@
//| length = 1000
//| mybuffer = array.array("H", 0x0000 for i in range(length))
//| rate = 500000
//| adcbuf = analogbufio.BufferedIn(board.GP26, mybuffer, rate)
//| adcbuf = analogbufio.BufferedIn(board.GP26, sample_rate=rate)
//| adcbuf.readinto(mybuffer)
//| adcbuf.deinit()
//| for i in range(length):
//| print(i, mybuffer[i])
@ -60,26 +60,17 @@
//| (TODO) Provide mechanism to read CPU Temperature."""
//| def __init__(
//| self, pin: microcontroller.Pin, buffer: WriteableBuffer, *, sample_rate: int = 500000
//| ) -> None:
//| """Create a `BufferedIn` on the given pin. ADC values will be read
//| into the given buffer at the supplied sample_rate. Depending on the
//| buffer typecode, 'b', 'B', 'h', 'H', samples are 8-bit byte-arrays or
//| 16-bit half-words and are signed or unsigned.
//| The ADC most significant bits of the ADC are kept. (See
//| def __init__(self, pin: microcontroller.Pin, *, sample_rate: int) -> None:
//| """Create a `BufferedIn` on the given pin and given sample rate.
//| :param ~microcontroller.Pin pin: the pin to read from
//| :param ~circuitpython_typing.WriteableBuffer buffer: buffer: A buffer for samples
//| :param ~int sample_rate: rate: sampling frequency, in samples per second"""
//| ...
STATIC mp_obj_t analogbufio_bufferedin_make_new(const mp_obj_type_t *type, size_t n_args, size_t n_kw, const mp_obj_t *all_args) {
enum { ARG_pin, ARG_buffer, ARG_sample_rate };
enum { ARG_pin, ARG_sample_rate };
static const mp_arg_t allowed_args[] = {
{ MP_QSTR_sample_rate, MP_ARG_KW_ONLY | MP_ARG_INT, {.u_int = 500000} },
mp_arg_val_t args[MP_ARRAY_SIZE(allowed_args)];
mp_arg_parse_all_kw_array(n_args, n_kw, all_args, MP_ARRAY_SIZE(allowed_args), allowed_args, args);
@ -87,37 +78,12 @@ STATIC mp_obj_t analogbufio_bufferedin_make_new(const mp_obj_type_t *type, size_
// Validate Pin
const mcu_pin_obj_t *pin = validate_obj_is_free_pin(args[ARG_pin].u_obj);
// Buffer defined and allocated by user
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(args[ARG_buffer].u_obj, &bufinfo, MP_BUFFER_READ);
// signed or unsigned, byte per sample
bool signed_samples = bufinfo.typecode == 'b' || bufinfo.typecode == 'h';
uint8_t bytes_per_sample = 1;
// Bytes Per Sample
if (bufinfo.typecode == 'h' || bufinfo.typecode == 'H') {
bytes_per_sample = 2;
} else if (bufinfo.typecode != 'b' && bufinfo.typecode != 'B' && bufinfo.typecode != BYTEARRAY_TYPECODE) {
mp_raise_ValueError_varg(translate("%q must be a bytearray or array of type 'h', 'H', 'b', or 'B'"), MP_QSTR_buffer);
// Validate sample rate here
uint32_t sample_rate = (uint32_t)mp_arg_validate_int_range(args[ARG_sample_rate].u_int, 1, 500000, MP_QSTR_sample_rate);
// Create local object
analogbufio_bufferedin_obj_t *self = m_new_obj(analogbufio_bufferedin_obj_t);
analogbufio_bufferedin_obj_t *self = m_new_obj_with_finaliser(analogbufio_bufferedin_obj_t);
self->base.type = &analogbufio_bufferedin_type;
// Call local intereface in ports/common-hal/analogbufio
((uint8_t *)bufinfo.buf),
// Call local interface in ports/common-hal/analogbufio
common_hal_analogbufio_bufferedin_construct(self, pin, args[ARG_sample_rate].u_int);
return MP_OBJ_FROM_PTR(self);
@ -153,23 +119,46 @@ STATIC mp_obj_t analogbufio_bufferedin___exit__(size_t n_args, const mp_obj_t *a
STATIC MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(analogbufio_bufferedin___exit___obj, 4, 4, analogbufio_bufferedin___exit__);
//| def read(self) -> None:
//| """Fills the provided buffer with ADC voltage values."""
//| def readinto(self, buffer: WriteableBuffer) -> int:
//| """Fills the provided buffer with ADC voltage values.
//| ADC values will be read into the given buffer at the supplied sample_rate.
//| Depending on the buffer typecode, 'B', 'H', samples are 8-bit byte-arrays or
//| 16-bit half-words and are always unsigned.
//| The ADC most significant bits of the ADC are kept. (See
//| :param ~circuitpython_typing.WriteableBuffer buffer: buffer: A buffer for samples"""
//| ...
STATIC mp_obj_t analogbufio_bufferedin_obj_read(mp_obj_t self_in) {
STATIC mp_obj_t analogbufio_bufferedin_obj_readinto(mp_obj_t self_in, mp_obj_t buffer_obj) {
analogbufio_bufferedin_obj_t *self = MP_OBJ_TO_PTR(self_in);
return mp_const_none;
// Buffer defined and allocated by user
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(buffer_obj, &bufinfo, MP_BUFFER_READ);
uint8_t bytes_per_sample = 1;
// Bytes Per Sample
if (bufinfo.typecode == 'H') {
bytes_per_sample = 2;
} else if (bufinfo.typecode != 'B' && bufinfo.typecode != BYTEARRAY_TYPECODE) {
mp_raise_ValueError_varg(translate("%q must be a bytearray or array of type 'H' or 'B'"), MP_QSTR_buffer);
mp_uint_t captured = common_hal_analogbufio_bufferedin_readinto(self, bufinfo.buf, bufinfo.len, bytes_per_sample);
return MP_OBJ_NEW_SMALL_INT(captured);
MP_DEFINE_CONST_FUN_OBJ_1(analogbufio_bufferedin_read_obj, analogbufio_bufferedin_obj_read);
MP_DEFINE_CONST_FUN_OBJ_2(analogbufio_bufferedin_readinto_obj, analogbufio_bufferedin_obj_readinto);
STATIC const mp_rom_map_elem_t analogbufio_bufferedin_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&analogbufio_bufferedin_deinit_obj) },
{ MP_ROM_QSTR(MP_QSTR_deinit), MP_ROM_PTR(&analogbufio_bufferedin_deinit_obj) },
{ MP_ROM_QSTR(MP_QSTR___enter__), MP_ROM_PTR(&default___enter___obj) },
{ MP_ROM_QSTR(MP_QSTR___exit__), MP_ROM_PTR(&analogbufio_bufferedin___exit___obj) },
{ MP_ROM_QSTR(MP_QSTR_read), MP_ROM_PTR(&analogbufio_bufferedin_read_obj)},
{ MP_ROM_QSTR(MP_QSTR_readinto), MP_ROM_PTR(&analogbufio_bufferedin_readinto_obj)},
@ -32,9 +32,9 @@
extern const mp_obj_type_t analogbufio_bufferedin_type;
void common_hal_analogbufio_bufferedin_construct(analogbufio_bufferedin_obj_t *self, const mcu_pin_obj_t *pin, uint8_t *buffer, uint32_t len, uint8_t bytes_per_sample, bool samples_signed, uint32_t sample_rate);
void common_hal_analogbufio_bufferedin_construct(analogbufio_bufferedin_obj_t *self, const mcu_pin_obj_t *pin, uint32_t sample_rate);
void common_hal_analogbufio_bufferedin_deinit(analogbufio_bufferedin_obj_t *self);
bool common_hal_analogbufio_bufferedin_deinited(analogbufio_bufferedin_obj_t *self);
void common_hal_analogbufio_bufferedin_read(analogbufio_bufferedin_obj_t *self);
uint32_t common_hal_analogbufio_bufferedin_readinto(analogbufio_bufferedin_obj_t *self, uint8_t *buffer, uint32_t len, uint8_t bytes_per_sample);
@ -40,22 +40,6 @@
//| call :py:meth:`!deinit` or use a context manager. See
//| :ref:`lifetime-and-contextmanagers` for more info.
//| For example::
//| import analogbufio
//| import array
//| from board import *
//| length = 5000000
//| mybuffer = array.array("H", 0x0000 for i in range(length))
//| adc_in = analogbufio.BufferedIn(GP26, mybuffer, length)
//| print(*mybuffer)
//| adc_in.deinit()
//| This example will initialize the the device, read and fill
//| :py:data:`~analogbufio.BufferedIn` to mybuffer
//| TODO: For the essentials of `analogbufio`, see the `CircuitPython Essentials
//| Learn guide <>`_
Reference in New Issue
Block a user