0

I'm using a Raspberry Pi 4B and a development kit which contains sensors and sends the data via BLE.

The project consists of a Raspberry Pi working as an Edge device where it reads sensor data via BLE, filters the data and processes it. For the processing part, I want to process the same data with two Machine Learning models and with programmable logic (doing math calculations with the data). The output I want to achieve is to know the position of my development kit (it goes on top of an elevator roof, so I can know the vertical position and also the position of the doors).

When I try to run the processing code separately (one project with the programmable logic and another with the two ML models) they work fine, but when I join them the thread in charge of the programmable logic processing doesn't work well.

from bluepy import btle
import threading
import numpy as np
import tensorflow as tf
import time
import variables as var
import functions as func

class MyDelegate(btle.DefaultDelegate):
    def __init__(self):
        btle.DefaultDelegate.__init__(self)

    def handleNotification(self, cHandle, data):
        var.acc_z[0] = int.from_bytes(data[6:8],"little",signed=True)
        var.acc_y[0] = int.from_bytes(data[4:6],"little",signed=True)
        var.acc_x[0] = int.from_bytes(data[2:4],"little",signed=True)
        var.mag_x[0] = int.from_bytes(data[14:16],"little",signed=True)/1000#Convert mGAuss to Gauss
        var.mag_y[0] = int.from_bytes(data[16:18],"little",signed=True)/1000#Convert mGAuss to Gauss
        var.mag_z[0] = int.from_bytes(data[18:20],"little",signed=True)/1000#Convert mGAuss to Gauss

        func.Filter_Data()            
        if var.calculate_average == 1:
            func.Calculate_Average()
        else:
            var.ready_read = True
            if var.count_buffer_DOORS < var.WINDOW_STEP_DOORS - 1:
                var.count_buffer_DOORS += 1
                var.window_buffer_DOORS = var.window_buffer_DOORS[1:]+[var.f_acc_x[0]-var.average_acc_x]
            else: 
                var.count_buffer_DOORS = 0
                var.window_buffer_DOORS = var.window_buffer_DOORS[1:]+[var.f_acc_x[0]-var.average_acc_x]
                var.ready_model_DOORS = 1
            
            if var.count_buffer_CAB < var.WINDOW_STEP_CAB - 1: 
                var.count_buffer_CAB += 1
                var.window_buffer_CAB = var.window_buffer_CAB[1:]+[var.f_acc_y[0]-var.average_acc_y]
            else: 
                var.count_buffer_CAB = 0
                var.window_buffer_CAB = var.window_buffer_CAB[1:]+[var.f_acc_y[0]-var.average_acc_y]
                var.ready_model_CAB = 1

#Thread where programmed logic calculations run 
def programmedLogic_THREAD():
    while var.stop_thread == False: 
        if (var.ready_read == True):            
            func.Calculate_Desv_Est_Mag()
            func.Calculate_Position_Cab()
            if var.flag_ready == 1 and var.learn_mag == 1: 
                func.Learn_Mag()                
            func.Calculate_Position_Doors()
            if var.update_average == 1:
                func.Update_Average()
            else: 
                var.count_average = 0
                var.sum_acc_z = 0
                var.sum_acc_y = 0
                var.sum_acc_x = 0
            if var.state == 1: var.DB_LP_cab = "Up"
            elif var.state == -1: var.DB_LP_cab = "Down"
            elif var.state == 0: var.DB_LP_cab = "Still"

            if var.move_doors == 1: var.DB_LP_doors = "Opening"
            elif var.move_doors == -1: var.DB_LP_doors = "Closing"
            elif var.move_doors == 0: var.DB_LP_doors = "Still"
            var.ready_read = False
        
#Thread where ML Doors Model runs and predicts
def doorsML_THREAD():
    while var.stop_thread == False: 
        if (var.ready_model_DOORS == 1) and (var.ML_flag == True): 
            var.ready_model_DOORS = 0
            x = np.expand_dims(var.window_buffer_DOORS, axis = 1)
            var.result_DOORS = custom_model_puertas.predict(x.T.tolist(), verbose = 0)
            var.doors_State = np.argmax(var.result_DOORS)          
            var.array_doors_State = var.array_doors_State[1:]+[var.doors_State]
            if all(element == var.array_doors_State[0] for element in var.array_doors_State):
                var.print_doors_State = var.array_doors_State[0]
                if var.print_doors_State == 0: 
                    var.DB_ML_doors = "Opening"
                elif var.print_doors_State == 1: 
                    var.DB_ML_doors = "Closing"
                elif var.print_doors_State == 2: 
                    var.DB_ML_doors = "Still"

#Thread where ML Cab Model runs and predicts
def cabML_THREAD():
    while var.stop_thread == False: 
        if (var.ready_model_CAB == 1) and (var.ML_flag == True): 
            var.ready_model_CAB = 0
            x = np.expand_dims(var.window_buffer_CAB, axis = 1)
            var.result_CAB = custom_model_cabina.predict(x.T.tolist(), verbose = 0)
            var.cab_State = np.argmax(var.result_CAB)
            var.array_cab_State = var.array_cab_State[1:]+[var.cab_State]
            if all(element == var.array_cab_State[0] for element in var.array_cab_State):
                var.print_cab_State = var.array_cab_State[0]
                if var.print_cab_State == 0: 
                    var.DB_ML_cab = "Down"
                elif var.print_cab_State == 1: 
                    var.DB_ML_cab = "Still"
                elif var.print_cab_State == 2: 
                   var.DB_ML_cab="Up"

#Thread that prints results in terminal
def print_THREAD():
    count = 0
    while (var.stop_thread == False):
        count+=1
        if count>=20: 
            count=0
            var.programmedLogic_flag = not var.programmedLogic_flag
            var.ML_flag = not var.ML_flag
        if var.programmedLogic_flag == True:         
            print(f"\n\tLP Data:\nCab state:\tDoors state:\n{var.DB_LP_cab}\t\t{var.DB_LP_doors}") 
        elif var.ML_flag == True:
            print(f"\n\tML Data:\nCab state:\tDoors state:\n{var.DB_ML_cab}\t\t{var.DB_ML_doors}")
        time.sleep(0.5)

#Create threads
thread1 = threading.Thread(target=programmedLogic_THREAD)
thread2 = threading.Thread(target=doorsML_THREAD)
thread3 = threading.Thread(target=cabML_THREAD)
thread4 = threading.Thread(target=print_THREAD)

#Upload ML Models
print("Uploading ML Models...")
custom_model_puertas = tf.keras.models.load_model('/home/aag/Desktop/Edge_Impulse_Model/doors_MODEL.h5')
custom_model_cabina = tf.keras.models.load_model('/home/aag/Desktop/Edge_Impulse_Model/cab_MODEL.h5')
print("Successfully uploaded")

#Connect to BLE device
print("Connecting ...")
p = btle.Peripheral(var.MAC_ADDR,addrType=btle.ADDR_TYPE_RANDOM)
p.setDelegate( MyDelegate() )
print("Connected to DA:BB:C5:28:12:A7")

# Setup to turn notifications on
svc = p.getServiceByUUID("00000000-0001-11e1-9ab4-0002a5d5c51b")
ch = svc.getCharacteristics("00e00000-0001-11e1-ac36-0002a5d5c51b")[0]

p.writeCharacteristic(ch.valHandle+1, b"\x01\x00")

#Initialize threads
thread1.start()
thread2.start()
thread3.start()
thread4.start()

while True:
    try:
        if p.waitForNotifications(1.0):
            continue
        
    except KeyboardInterrupt:
        p.disconnect()
        print("\nDevice disconnected")
        var.stop_thread = True
        thread1.join()
        thread2.join()
        thread3.join()        
        thread4.join()
        break         


In the project I have added I use two files (variables and functions) where I have all my variables and functions defined. I have used 4 threads so that I can have several tasks runnning at the same time. Also, I read the BLE data via notifications, so I have an interruption function.

When I inicialize all of the threads, the thread in charge of the programmable logic doesn't work (it does the calculations wrong) but when I don't inicialize threads 2 and 3 (in charge of the ML predictions), the programmable logic thread works perfectly fine (so I can rule out that the functions I am using to calculate the position are wrong).

I attach below a screenshot of the command htop:

screenshot of htop command

Am I making a mistake or missing something?

2
  • In general, concurrency adds a lot of complexity and problems, like race conditions (mentioned in Ahmed's answer) but also others like deadlocks, livelocks etc. There are certain principles to try to conquer that complexity and avoid certain issues. I recommend reading up on the topic before continuing with multithreading. Using threadsafe structures, like Ahmed suggests, is your best and easiest remedy though. Commented May 3, 2024 at 13:02
  • Please extract a minimal reproducible example from your code and post that, not the whole thing. As a new user here, also take the tour and read How to Ask. Commented May 4, 2024 at 7:07

1 Answer 1

1

Modifying and Accessing a global variable from any thread is a potential race condition, this is why globals are evil.

import variables as var

var.acc_z[0] = int.from_bytes(data[6:8],"little",signed=True)
# multiple other accesses to this variable..

This var is spread throughout the code, and any access to it is a race condition.

Remove this var entirely, and either replace it by multiple global queues or pass the queue to the threaded function as argument, note that queues in python are thread-safe.

from queue import Queue

from_sensor_to_ML1_thread = Queue()
from_sensor_to_ML2_thread = Queue()
# etc ...

When a thread wants to send any information to another thread it will put the new piece of information on that queue, and the other thread will periodically read data from that queue.

# sensor thread
new_sensor_data = {'x': 5}
from_sensor_to_ML1_thread.put(new_sensor_data)

# ML1 thread
new_sensor_data = from_sensor_to_ML1_thread.get()
if new_sensor_data is None:
    break

# when terminating
from_sensor_to_ML1_thread.put(None)

For more information on the topic you should read about the producer consumer pattern.

lastly you may find out that you should be using multiprocessing instead of multithreading to speed up your computation in which case porting this code will just be switching queue.Queue to multiprocessing.Queue.

Sign up to request clarification or add additional context in comments.

2 Comments

thanks for answering so fast! I am going to try to use queues and see if that's it! My question is: how can I use queus if I want to access the same variable from different threads (the acceleration data is being accessed from the programmable logic thread and from one of the ML threads)? Using two queus (queue_from_interrupt_to_thread1 and queue_from_interrupt_to_thread2)?
@aag you can put the same data on both queues q1.put(data); q2.put(data), in which case both threads can read it, but you should never modify that object from either of the two ML threads, otherwise you are back to the same problem, the problem is that you don't control who or when data is being written from one thread to the other, queues solve this issue so long as the queue is the only form of communication between the threads.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.