tests/thread: Add tests for running GC within a thread, and heap stress.
This commit is contained in:
parent
c93d9caa8b
commit
dcc7c5bd89
|
@ -0,0 +1,34 @@
|
|||
# test that we can run the garbage collector within threads
|
||||
#
|
||||
# MIT license; Copyright (c) 2016 Damien P. George on behalf of Pycom Ltd
|
||||
|
||||
import gc
|
||||
import _thread
|
||||
|
||||
def thread_entry(n):
|
||||
# allocate a bytearray and fill it
|
||||
data = bytearray(i for i in range(256))
|
||||
|
||||
# do some work and call gc.collect() a few times
|
||||
for i in range(n):
|
||||
for i in range(len(data)):
|
||||
data[i] = data[i]
|
||||
gc.collect()
|
||||
|
||||
# print whether the data remains intact and indicate we are finished
|
||||
with lock:
|
||||
print(list(data) == list(range(256)))
|
||||
global n_finished
|
||||
n_finished += 1
|
||||
|
||||
lock = _thread.allocate_lock()
|
||||
n_thread = 4
|
||||
n_finished = 0
|
||||
|
||||
# spawn threads
|
||||
for i in range(n_thread):
|
||||
_thread.start_new_thread(thread_entry, (10,))
|
||||
|
||||
# busy wait for threads to finish
|
||||
while n_finished < n_thread:
|
||||
pass
|
|
@ -0,0 +1,42 @@
|
|||
# stress test for the heap by allocating lots of objects within threads
|
||||
# allocates about 5mb on the heap
|
||||
#
|
||||
# MIT license; Copyright (c) 2016 Damien P. George on behalf of Pycom Ltd
|
||||
|
||||
import _thread
|
||||
|
||||
def last(l):
|
||||
return l[-1]
|
||||
|
||||
def thread_entry(n):
|
||||
# allocate a bytearray and fill it
|
||||
data = bytearray(i for i in range(256))
|
||||
|
||||
# run a loop which allocates a small list and uses it each iteration
|
||||
lst = 8 * [0]
|
||||
sum = 0
|
||||
for i in range(n):
|
||||
sum += last(lst)
|
||||
lst = [0, 0, 0, 0, 0, 0, 0, i + 1]
|
||||
|
||||
# check that the bytearray still has the right data
|
||||
for i, b in enumerate(data):
|
||||
assert i == b
|
||||
|
||||
# print the result of the loop and indicate we are finished
|
||||
with lock:
|
||||
print(sum, lst[-1])
|
||||
global n_finished
|
||||
n_finished += 1
|
||||
|
||||
lock = _thread.allocate_lock()
|
||||
n_thread = 10
|
||||
n_finished = 0
|
||||
|
||||
# spawn threads
|
||||
for i in range(n_thread):
|
||||
_thread.start_new_thread(thread_entry, (10000,))
|
||||
|
||||
# busy wait for threads to finish
|
||||
while n_finished < n_thread:
|
||||
pass
|
Loading…
Reference in New Issue