-2

I have initialize a timer on class where after specified seconds, it will called another target function. The situation is on load, it will display welcome function gui and open the serial port.

On data received through serial port, I want to enquiry the information on my web api and after display the information, I want to start the timer that has been declared on __init__.

But if another data coming through the serial port before the timer hit, I want the timer to restart/reset as more data coming, I don't have to show welcome function again until there is nothing coming between the timer run.

I have found something from this answer, but it does not trigger callback function

from threading import Thread, Timer

class TimerThread(threading.Thread):
    def __init__(self, timeout=3, sleep_chunk=0.25, callback=None, *args):
        threading.Thread.__init__(self)
        print('timer:initialized')
        self.timeout = timeout
        print('timeout:' + str(timeout))
        self.sleep_chunk = sleep_chunk
        print('timeout:' + str(sleep_chunk))
        if callback == None:
            self.callback = None
            print('callback:none')
        else:
            self.callback = callback
            print('callback:' + str(callback))
        self.callback_args = args

        self.terminate_event = threading.Event()
        self.start_event = threading.Event()
        self.reset_event = threading.Event()
        self.count = self.timeout/self.sleep_chunk

    def run(self):
        print('timer:run')
        while not self.terminate_event.is_set():
            while self.count > 0 and self.start_event.is_set():
                if self.reset_event.wait(self.sleep_chunk):  # wait for a small chunk of timeout
                    self.reset_event.clear()
                    self.count = self.timeout/self.sleep_chunk  # reset
                self.count -= 1
            if self.count <= 0:
                self.start_event.clear()
                print('timeout. calling function...')
                self.callback(*self.callback_args)
                self.count = self.timeout/self.sleep_chunk  # reset

    def start_timer(self):
        print('start_timer')
        self.start_event.set()

    def stop_timer(self):
        print('stop_timer')
        self.start_event.clear()
        self.count = self.timeout / self.sleep_chunk  # reset

    def restart_timer(self):
        # reset only if timer is running. otherwise start timer afresh
        print('restart_timer')
        if self.start_event.is_set():
            self.reset_event.set()
            print('restart_timer:reset')
        else:
            self.start_event.set()
            print('restart_timer:start')

    def terminate(self):
        print('terminate')
        self.terminate_event.set()

class Application(Tk):
    def show_idle_screen(self):
        for widget in self.winfo_children():
            widget.destroy()
        # Create new widget element

    def show_product_info(self, barcodeno):
        # call web api and get response
        # create information widget element
        # -Timer(5.0, self.show_idle_screen).start()
        tmr = TimerThread(timeout, sleep_chunk, Application().show_idle_screen)
        tmr.restart_timer()


    def serial_port_data_received(self):
        if self.serial.inWaiting() > 0:
            # The rest of handling serial data
            # UPDATED CODE START
            # -if self.idle_timer.is_alive():
            #     -print('cancelling timer')
            #     -self.idle_timer.cancel()
            # UPDATED CODE END
            self.query_thread = Thread(target=lambda: self.show_product_info(barcodedata))
            self.query_thread.start()


    def __init__(self):
            # Initialization of timer
            # -self.idle_timer = Timer(5.0, self.show_idle_screen)
            self.new_thread = Thread(target=self.show_idle_screen)
            self.new_thread.start()

root = Application()
root.mainloop()

Log:

timer:initialized
timeout:6
timeout:0.25
callback:<bound method Application.show_idle_screen of <__main__.Application object .>>
start_timer
10
  • btw: Thread(target=self.show_product_info, args=(barcodedata,)) Commented Oct 8, 2020 at 9:12
  • Timer has function cancel() to stop the timer. You will have to stop timer and create new Timer Commented Oct 8, 2020 at 9:15
  • 2
    Does this answer your question? How to set a timer & clear a timer? Commented Oct 8, 2020 at 9:16
  • (the answer to your question is contained in the question, you already know the answer to the other question - sorry if this sounds confusing) Commented Oct 8, 2020 at 9:16
  • @furas, having args make an error TypeError: show_product_info() takes 2 positional arguments but 14 were given. Commented Oct 8, 2020 at 9:17

1 Answer 1

0

I finally able to run what I desired after try and error.

from threading import Thread, Timer

class TimerThread(threading.Thread):
    def __init__(self, timeout=3, sleep_chunk=0.25, callback=None, *args):
        threading.Thread.__init__(self)
        print('timer:initialized')
        self.timeout = timeout
        print('timeout:' + str(timeout))
        self.sleep_chunk = sleep_chunk
        print('timeout:' + str(sleep_chunk))
        if callback == None:
            self.callback = None
            print('callback:none')
        else:
            self.callback = callback
            print('callback:' + str(callback))
        self.callback_args = args

        self.terminate_event = threading.Event()
        self.start_event = threading.Event()
        self.reset_event = threading.Event()
        self.count = self.timeout/self.sleep_chunk

    def run(self):
        print('timer:run')
        while not self.terminate_event.is_set():
            while self.count > 0 and self.start_event.is_set():
                if self.reset_event.wait(self.sleep_chunk):  # wait for a small chunk of timeout
                    self.reset_event.clear()
                    self.count = self.timeout/self.sleep_chunk  # reset
                self.count -= 1
            if self.count <= 0:
                self.start_event.clear()
                print('timeout. calling function...')
                self.callback(*self.callback_args)
                self.count = self.timeout/self.sleep_chunk  # reset

    def start_timer(self):
        print('start_timer')
        self.start_event.set()

    def stop_timer(self):
        print('stop_timer')
        self.start_event.clear()
        self.count = self.timeout / self.sleep_chunk  # reset

    def restart_timer(self):
        # reset only if timer is running. otherwise start timer afresh
        print('restart_timer')
        if self.start_event.is_set():
            self.reset_event.set()
            print('restart_timer:reset')
        else:
            self.start_event.set()
            print('restart_timer:start')

    def terminate(self):
        print('terminate')
        self.terminate_event.set()

class Application(Tk):
    def show_idle_screen(self):
        for widget in self.winfo_children():
            widget.destroy()
        # Create new widget element
        print('idle_screen')

    def show_product_info(self, barcodeno):
        # call web api and get response
        # create information widget element
        # -Timer(5.0, self.show_idle_screen).start()
        # -tmr = TimerThread(timeout, sleep_chunk, Application().show_idle_screen)
        # -tmr.restart_timer()
        self.tmr.restart_timer()


    def serial_port_data_received(self):
        if self.serial.inWaiting() > 0:
            # The rest of handling serial data
            # UPDATED CODE START
            # -if self.idle_timer.is_alive():
            #     -print('cancelling timer')
            #     -self.idle_timer.cancel()
            # UPDATED CODE END
            self.query_thread = Thread(target=lambda: self.show_product_info(barcodedata))
            self.query_thread.start()


    def __init__(self):
            # Initialization of timer
            # -self.idle_timer = Timer(5.0, self.show_idle_screen)
            self.tmr = TimerThread(timeout, sleep_chunk, self.show_idle_screen)
            self.tmr.start()
            self.tmr.start_timer()
            self.new_thread = Thread(target=self.show_idle_screen)
            self.new_thread.start()

root = Application()
root.mainloop()

Log:

idle_screen
timeout. calling function...
idle_screen
Going to query information for data
restart_timer
restart_timer:start
timeout. calling function...
idle_screen
Going to query information for data

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.