I'm using a Raspberry Pi 4B and a development kit which contains sensors and sends the data via BLE.
The project consists of a Raspberry Pi working as an Edge device where it reads sensor data via BLE, filters the data and processes it. For the processing part, I want to process the same data with two Machine Learning models and with programmable logic (doing math calculations with the data). The output I want to achieve is to know the position of my development kit (it goes on top of an elevator roof, so I can know the vertical position and also the position of the doors).
When I try to run the processing code separately (one project with the programmable logic and another with the two ML models) they work fine, but when I join them the thread in charge of the programmable logic processing doesn't work well.
from bluepy import btle
import threading
import numpy as np
import tensorflow as tf
import time
import variables as var
import functions as func
class MyDelegate(btle.DefaultDelegate):
def __init__(self):
btle.DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
var.acc_z[0] = int.from_bytes(data[6:8],"little",signed=True)
var.acc_y[0] = int.from_bytes(data[4:6],"little",signed=True)
var.acc_x[0] = int.from_bytes(data[2:4],"little",signed=True)
var.mag_x[0] = int.from_bytes(data[14:16],"little",signed=True)/1000#Convert mGAuss to Gauss
var.mag_y[0] = int.from_bytes(data[16:18],"little",signed=True)/1000#Convert mGAuss to Gauss
var.mag_z[0] = int.from_bytes(data[18:20],"little",signed=True)/1000#Convert mGAuss to Gauss
func.Filter_Data()
if var.calculate_average == 1:
func.Calculate_Average()
else:
var.ready_read = True
if var.count_buffer_DOORS < var.WINDOW_STEP_DOORS - 1:
var.count_buffer_DOORS += 1
var.window_buffer_DOORS = var.window_buffer_DOORS[1:]+[var.f_acc_x[0]-var.average_acc_x]
else:
var.count_buffer_DOORS = 0
var.window_buffer_DOORS = var.window_buffer_DOORS[1:]+[var.f_acc_x[0]-var.average_acc_x]
var.ready_model_DOORS = 1
if var.count_buffer_CAB < var.WINDOW_STEP_CAB - 1:
var.count_buffer_CAB += 1
var.window_buffer_CAB = var.window_buffer_CAB[1:]+[var.f_acc_y[0]-var.average_acc_y]
else:
var.count_buffer_CAB = 0
var.window_buffer_CAB = var.window_buffer_CAB[1:]+[var.f_acc_y[0]-var.average_acc_y]
var.ready_model_CAB = 1
#Thread where programmed logic calculations run
def programmedLogic_THREAD():
while var.stop_thread == False:
if (var.ready_read == True):
func.Calculate_Desv_Est_Mag()
func.Calculate_Position_Cab()
if var.flag_ready == 1 and var.learn_mag == 1:
func.Learn_Mag()
func.Calculate_Position_Doors()
if var.update_average == 1:
func.Update_Average()
else:
var.count_average = 0
var.sum_acc_z = 0
var.sum_acc_y = 0
var.sum_acc_x = 0
if var.state == 1: var.DB_LP_cab = "Up"
elif var.state == -1: var.DB_LP_cab = "Down"
elif var.state == 0: var.DB_LP_cab = "Still"
if var.move_doors == 1: var.DB_LP_doors = "Opening"
elif var.move_doors == -1: var.DB_LP_doors = "Closing"
elif var.move_doors == 0: var.DB_LP_doors = "Still"
var.ready_read = False
#Thread where ML Doors Model runs and predicts
def doorsML_THREAD():
while var.stop_thread == False:
if (var.ready_model_DOORS == 1) and (var.ML_flag == True):
var.ready_model_DOORS = 0
x = np.expand_dims(var.window_buffer_DOORS, axis = 1)
var.result_DOORS = custom_model_puertas.predict(x.T.tolist(), verbose = 0)
var.doors_State = np.argmax(var.result_DOORS)
var.array_doors_State = var.array_doors_State[1:]+[var.doors_State]
if all(element == var.array_doors_State[0] for element in var.array_doors_State):
var.print_doors_State = var.array_doors_State[0]
if var.print_doors_State == 0:
var.DB_ML_doors = "Opening"
elif var.print_doors_State == 1:
var.DB_ML_doors = "Closing"
elif var.print_doors_State == 2:
var.DB_ML_doors = "Still"
#Thread where ML Cab Model runs and predicts
def cabML_THREAD():
while var.stop_thread == False:
if (var.ready_model_CAB == 1) and (var.ML_flag == True):
var.ready_model_CAB = 0
x = np.expand_dims(var.window_buffer_CAB, axis = 1)
var.result_CAB = custom_model_cabina.predict(x.T.tolist(), verbose = 0)
var.cab_State = np.argmax(var.result_CAB)
var.array_cab_State = var.array_cab_State[1:]+[var.cab_State]
if all(element == var.array_cab_State[0] for element in var.array_cab_State):
var.print_cab_State = var.array_cab_State[0]
if var.print_cab_State == 0:
var.DB_ML_cab = "Down"
elif var.print_cab_State == 1:
var.DB_ML_cab = "Still"
elif var.print_cab_State == 2:
var.DB_ML_cab="Up"
#Thread that prints results in terminal
def print_THREAD():
count = 0
while (var.stop_thread == False):
count+=1
if count>=20:
count=0
var.programmedLogic_flag = not var.programmedLogic_flag
var.ML_flag = not var.ML_flag
if var.programmedLogic_flag == True:
print(f"\n\tLP Data:\nCab state:\tDoors state:\n{var.DB_LP_cab}\t\t{var.DB_LP_doors}")
elif var.ML_flag == True:
print(f"\n\tML Data:\nCab state:\tDoors state:\n{var.DB_ML_cab}\t\t{var.DB_ML_doors}")
time.sleep(0.5)
#Create threads
thread1 = threading.Thread(target=programmedLogic_THREAD)
thread2 = threading.Thread(target=doorsML_THREAD)
thread3 = threading.Thread(target=cabML_THREAD)
thread4 = threading.Thread(target=print_THREAD)
#Upload ML Models
print("Uploading ML Models...")
custom_model_puertas = tf.keras.models.load_model('/home/aag/Desktop/Edge_Impulse_Model/doors_MODEL.h5')
custom_model_cabina = tf.keras.models.load_model('/home/aag/Desktop/Edge_Impulse_Model/cab_MODEL.h5')
print("Successfully uploaded")
#Connect to BLE device
print("Connecting ...")
p = btle.Peripheral(var.MAC_ADDR,addrType=btle.ADDR_TYPE_RANDOM)
p.setDelegate( MyDelegate() )
print("Connected to DA:BB:C5:28:12:A7")
# Setup to turn notifications on
svc = p.getServiceByUUID("00000000-0001-11e1-9ab4-0002a5d5c51b")
ch = svc.getCharacteristics("00e00000-0001-11e1-ac36-0002a5d5c51b")[0]
p.writeCharacteristic(ch.valHandle+1, b"\x01\x00")
#Initialize threads
thread1.start()
thread2.start()
thread3.start()
thread4.start()
while True:
try:
if p.waitForNotifications(1.0):
continue
except KeyboardInterrupt:
p.disconnect()
print("\nDevice disconnected")
var.stop_thread = True
thread1.join()
thread2.join()
thread3.join()
thread4.join()
break
In the project I have added I use two files (variables and functions) where I have all my variables and functions defined. I have used 4 threads so that I can have several tasks runnning at the same time. Also, I read the BLE data via notifications, so I have an interruption function.
When I inicialize all of the threads, the thread in charge of the programmable logic doesn't work (it does the calculations wrong) but when I don't inicialize threads 2 and 3 (in charge of the ML predictions), the programmable logic thread works perfectly fine (so I can rule out that the functions I am using to calculate the position are wrong).
I attach below a screenshot of the command htop:
Am I making a mistake or missing something?