Update
The code (untested for obvious reasons) has been redone to use child processes rather than threads because I was having difficulty with handling KeyboardInterrupt exception events when I had threads waiting on threading.Event instances. Perhaps somebody can explain why. Anyway, using processes can potentially improve performance since each process has its own GIL. But this requires now for the numpy array, self.buff and new attribute self.elapsed_time to be stored in shared memory so that all processes can share this data.
If I understand your question, you want to repeatedly gather data and plot it periodically. We need careful synchronization between the two threads and for this we will need several events:
self.start_event - This event is awaited by adcreceiver in order to start receiving data.
self.started_event - This event is set by adcreceiver to let halleffect_monitor know that it has started receiving data.
self.stop_event - This event is set by halleffect_monitor to tell adcreceiver to stop processing in case it is still receiving data.
self.stopped_event - This event is set by adcreceiver to let halleffect_monitor know that it has stopped processing.
start_plotting_event - Tell the main thread to start plotting.
self.plotting_completed_event - Set by the main thread to let halleffect_monitor that plotting has been completed.
This seems like a lot of events, but I believe they are required so that events can be cleared for the next receiving/plotting cycle without worrying about any possible race conditions.
from multiprocessing import Process, Event, Array, Value
import numpy as np
import time
def np_array_to_shared_array(np_array):
shared_array = Array('B', np_array.nbytes, lock=False)
arr = np.frombuffer(shared_array, np_array.dtype)
arr[:] = np_array.flatten(order='C')
return shared_array
def shared_array_to_np_array(shared_array, shape, dtype):
return np.ndarray(shape, dtype=dtype, buffer=shared_array)
class halleffectandadc:
def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
self.desired_clock_cycles = desired_clock_cycles
self.pin = pin
self.clock_speed = clock_speed
self.buff_size = buff_size
# Create np array in shared memory:
arr = np.ndarray((buff_size,), dtype="uint16")
shared_array = np_array_to_shared_array(arr)
self.buff = shared_array_to_np_array(shared_array, arr.shape, arr.dtype)
self.elapsed_time = Value('f', lock=False) # Shared memory floating point
self.start_event = Event()
self.started_event = Event()
self.stop_event = Event()
self.stopped_event = Event()
self.start_plotting_event = Event()
self.plotting_completed_event = Event()
def get_adc_val(self):
response = self.spi.xfer([0x00, 0x00])
return (response[0] << 8) + response[1]
def adcreceiver(self):
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
self.spi.max_speed_hz = self.clock_speed
try:
while True:
# Wait for start event:
self.start_event.wait()
self.start_event.clear() # clear for next time
self.started_event.set() # show we have started
print("The process is starting to collect data", flush=True)
time_start = time.perf_counter()
i = 0
while i < self.buff_size and not self.stop_event.is_set():
self.buff[i] = self.get_adc_val()
i += 1
print(i, flush=True)
self.elapsed_time.value = time.perf_counter() - time_start
self.stopped_event.set() # show we have stopped
print("The process has been stopped to plot the data", flush=True)
except KeyboardInterrupt:
self.spi.close()
print('adcreceiver terminating', flush=True)
except Exception as e:
print(e, flush=True)
def halleffect_monitor(self):
GPIO.setmode(GPIO.BCM)
GPIO.setup(pin, GPIO.IN)
try:
while True:
# Zero out buffer:
self.buff.fill(0)
# clear events related to stopping
self.stop_event.clear()
self.stopped_event.clear()
# start the adcreceiver thread:
self.start_event.set()
# wait for adcreceiver to start:
self.started_event.wait()
# at this point the self.start_event has been cleared by adcreceiver
# so if adcreceiver stops because its buffer has been filled it will
# block until self.start_event is set again
hall_effect_clock_count = 0
while hall_effect_clock_count < self.desired_clock_cycles:
print("Waiting....", flush=True)
if GPIO.wait_for_edge(self.pin, GPIO.RISING):
hall_effect_clock_count += 1
print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}", flush=True)
# stop the adcreceiver thread if it hasn't already stopped:
self.stop_event.set()
# wait for the adcreceiver thread to have stopped:
self.stopped_event.wait()
# plot the data:
self.plotting_completed_event.clear()
self.start_plotting_event.set()
self.plotting_completed_event.wait()
except KeyboardInterrupt:
print('halleffect_monitor terminating', flush=True)
except Exception as e:
print(e, flush=True)
def plot_data(self):
time_points = np.linspace(0, self.elapsed_time.value, self.buff_size, endpoint=True)
self.buff[0] = self.buff[1]
self.buff[1] = self.buff[2]
plt.plot(time_points, self.buff / 65535 * 5)
plt.xlabel("Elapsed Time (s)", fontsize=12)
plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
plt.ylabel("Voltage (V)", fontsize=12)
plt.show()
print(
f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
)
def main():
try:
adc1 = halleffectandadc(1, 17, 22000000, 100000)
p1 = Process(target=adc1.halleffect_monitor)
p2 = Process(target=adc1.adcreceiver)
p1.start()
p2.start()
print('Type Ctrl-c to end up ...')
while True:
adc1.start_plotting_event.wait()
adc1.start_plotting_event.clear()
adc1.plot_data()
adc1.plotting_completed_event.set()
except KeyboardInterrupt:
pass
print('Waiting for processes to finish up ...')
p1.join()
p2.join()
if __name__ == '__main__':
main()
Multithreading Version
I believe I figured out a resolution to the problem with receiving KeyboardInterrupt exceptions when waiting on a threading.Event instance, which is to use instead a multiprocessing.Event instance.
import threading
import multiprocessing
import numpy as np
import time
class halleffectandadc:
def __init__(self, desired_clock_cycles, pin, clock_speed, buff_size):
self.start_event = threading.Event()
self.started_event = threading.Event()
self.stop_event = threading.Event()
self.stopped_event = threading.Event()
# Note the use of a multiprocessing.Event:
self.start_plotting_event = multiprocessing.Event()
self.plotting_completed_event = threading.Event()
self.spi = spidev.SpiDev()
self.spi.open(0, 0)
self.spi.max_speed_hz = clock_speed
self.buff_size = buff_size
self.buff = np.ndarray((self.buff_size,), dtype="uint16")
self.desired_clock_cycles = desired_clock_cycles
self.pin = pin
GPIO.setmode(GPIO.BCM)
GPIO.setup(pin, GPIO.IN)
def get_adc_val(self):
response = self.spi.xfer([0x00, 0x00])
return (response[0] << 8) + response[1]
def adcreceiver(self):
while True:
# Wait for start event:
self.start_event.wait()
self.start_event.clear() # clear for next time
self.started_event.set() # show we have started
print("The thread is starting to collect data")
self.time_start = time.perf_counter()
i = 0
while i < self.buff_size and not self.stop_event.is_set():
self.buff[i] = self.get_adc_val()
i += 1
print(i)
self.time_end = time.perf_counter()
self.stopped_event.set() # show we have stopped
print("The thread has been stopped to plot the data")
def halleffect_monitor(self):
while True:
# start with an empty buffer for each new cycle:
self.buff.fill(0)
# clear events related to stopping
self.stop_event.clear()
self.stopped_event.clear()
# start the adcreceiver thread:
self.start_event.set()
# wait for adcreceiver to start:
self.started_event.wait()
# at this point the self.start_event has been cleared by adcreceiver
# so if adcreceiver stops because its buffer has been filled it will
# block until self.start_event is set again
hall_effect_clock_count = 0
while hall_effect_clock_count < self.desired_clock_cycles:
print("Waiting....")
if GPIO.wait_for_edge(self.pin, GPIO.RISING):
hall_effect_clock_count += 1
print(f"Magnetic Field Detected, Number of Magnetic Fields Detected: {hall_effect_clock_count}")
# stop the adcreceiver thread if it hasn't already stopped:
self.stop_event.set()
# wait for the adcreceiver thread to have stopped:
self.stopped_event.wait()
# plot the data:
self.plotting_completed_event.clear()
self.start_plotting_event.set()
self.plotting_completed_event.wait()
def plot_data(self):
elapsed_time = self.time_end - self.time_start
time_points = np.linspace(0, elapsed_time, self.buff_size, endpoint=True)
self.buff[0] = self.buff[1]
self.buff[1] = self.buff[2]
plt.plot(time_points, self.buff / 65535 * 5)
plt.xlabel("Elapsed Time (s)", fontsize=12)
plt.title("Change in Potentiometer Wiper Voltage", fontsize=12)
plt.ylabel("Voltage (V)", fontsize=12)
plt.show()
print(
f"dT: {elapsed_time}s, STD: {np.std(self.buff):.2f}, MIN: {min(self.buff)}, MAX: {max(self.buff)}, AVG: {np.mean(self.buff):.2f}"
)
try:
adc1 = halleffectandadc(1, 17, 22000000, 100000)
# Make these daemon threads:
threading.Thread(target=adc1.halleffect_monitor, daemon=True).start()
threading.Thread(target=adc1.adcreceiver, daemon=True).start()
print('Hit Ctrl-C to terminate ...\n')
while True:
adc1.start_plotting_event.wait()
adc1.start_plotting_event.clear()
adc1.plot_data()
adc1.plotting_completed_event.set()
except KeyboardInterrupt:
adc1.spi.close()