0

In my example certain events are not handled (messages are not printed) and I don't understand why. To motivate my code a little: The problem I want to solve (currently) is only a simple blinking LED connected to a Raspberry Pi. The main thread is a web server (axum) that handles messages and changes the led-state accordingly. I want to run the LED-control-code in another thread that listens to messages sent over a channel:

enum ThreadControl {
    Quit,
    LedMessage(LedState),
}

enum LedState {
    Off,
    On,
    BlinkOnOff { period_ms: u16 },
    // BreathInOutLinear { period_ms: u16 },
    // BreathInOutLogarithmic { period_ms: u16 },
    // FallingSawtoothLinear { period_ms: u16 },
    // RisingSawtoothLinear { period_ms: u16 },
    // AttackDecay { attack_ms: u16, decay_ms: u16 },
    // ... more mindblowing blink-stuff that might 
    // become complicated and should run independently
}

Next I define a function that controls the LED in a loop and listens to ThreadControl-messages sent over a message-passing-channel and spawn that function in a thread. While the led-control-thread is running, I start another thread that sends ThreadControl-messages to change the LED's behavior or request the function to stop the loop. (Maybe I don't need another thread to send messages, I'm not sure).

use std::{
    sync::mpsc::{channel, Receiver},
    thread::{self, sleep, spawn},
    time::{Duration, Instant},
};

fn run_led_thread(rx: Receiver<ThreadControl>) {
    println!("hello from led control thread");

    let mut pin_state = false; // later will be the actual GPIO pin
    let mut led_state = LedState::Off;
    let mut now = Instant::now();
    let mut keep_running = true;

    loop {
        rx.iter().for_each(|msg| match msg {
            ThreadControl::LedMessage(new_led_state) => {
                println!("Handling led message");
                led_state = new_led_state;
            }
            ThreadControl::Quit => {
                println!("Quitting thread");
                keep_running = false;
            }
        });

        if keep_running {
            match led_state {
                LedState::Off => {
                    pin_state = false;
                    println!("off")
                }
                LedState::On => {
                    pin_state = true;
                    println!("on")
                }
                LedState::BlinkOnOff { period_ms: value } => {
                    if now.elapsed() > Duration::from_millis((value as u64) / 2) {
                        pin_state = !pin_state;
                        now = Instant::now();
                        match pin_state {
                            true => println!("blink: on"),
                            false => println!("blink: off"),
                        }
                    }
                }
            }
            // avoid thread running at 100%
            // sleep arbitrary duration
            // maybe there's a better solution?
            sleep(Duration::from_millis(5))
        } else {
            break;
        }
    }
}

fn main() {
    let (tx, rx) = channel();

    // a thread that controls the LED and waits for new
    // instructions for LED behavior or Quit-Messate
    let handle_led_driver = spawn(|| run_led_thread(rx));

    // another thread that sends control messages.
    // (maybe this could be the main thread)
    let handle_control = spawn(move || {
        tx.send(ThreadControl::LedMessage(LedState::On)).unwrap();
        thread::sleep(Duration::from_millis(200));
        tx.send(ThreadControl::LedMessage(LedState::Off)).unwrap();
        thread::sleep(Duration::from_millis(200));
        tx.send(ThreadControl::LedMessage(LedState::BlinkOnOff {
            period_ms: 10,
        }))
        .unwrap();
        thread::sleep(Duration::from_millis(500));
        tx.send(ThreadControl::Quit).unwrap();
    });

    handle_led_driver.join().unwrap();
    handle_control.join().unwrap();
}

When I run this example I don't get the expected output. I get:

hello from led control thread
Handling led message
Handling led message
Handling led message
Quitting thread

What I would expect is

hello from led control thread
Handling led message
on
Handling led message
off
Handling led message
blink: on
blink: off
blink: on
... // and so on
Quitting thread

Somehow that second match (which matches LedState) is not working as no messages are printed. What am I doing wrong?

Also: Is this a clever solution in the first place, or is it rather stupid for this kind of problem. I noticed that receiver seems to have a kind of queue. In my case LED-example I don't expect the queue to mostly be one item (or none at all). In Rust there is also shared state with Arc<Mutex< ... >> which I also considered. But somewhere I read "don't communicate via shared memory".

3
  • 4
    rx.iter().for_each() loops over every message that's ever (going to be) sent to that rx not those that are currently in it. Commented May 14, 2024 at 13:05
  • Thank you, that explains some thing! Although I am not sure how to run the event loop, still receive the messages and be able to quit the loop. Does that mean that message-passing would be better suited for other tasks? Commented May 14, 2024 at 14:49
  • If you are using Axum, you are probably using tokio. So maybe you can just have a channel with select!? Commented May 14, 2024 at 19:14

2 Answers 2

1

The problem is that rx.iter().for_each() loops over every message that's ever (going to be) sent to that rx not only those that are currently in it.

You can handle the messages and the LED all in the outer loop by using recv* instead of turning the rx into an iterator:

fn run_led_thread(rx: Receiver<LedState>) {
    println!("hello from led control thread");

    let mut pin_state = false; // later will be the actual GPIO pin
    let mut led_state = LedState::Off;
    let mut next_blink = None::<Instant>;

    loop {
        let msg = if let Some(next_blink) = next_blink {
            // we're currently blinking the LED, so we have to block for
            // - the next message
            // - or until we need to switch the LED state again
            // whichever comes first
            rx.recv_timeout(next_blink.saturating_duration_since(Instant::now()))
        } else {
            // we're not currently blinking so we can just block
            // until we receive another message
            rx.recv().map_err(RecvTimeoutError::from)
        };

        match msg {
            Ok(new_led_state) => {
                println!("Handling led message");
                led_state = new_led_state;
                next_blink = None; // reset potential previous blinking
            }
            Err(RecvTimeoutError::Disconnected) => {
                println!("Quitting thread");
                break;
            }
            Err(RecvTimeoutError::Timeout) => {}
        }

        match led_state {
            LedState::Off => pin_state = false,
            LedState::On => pin_state = true,
            LedState::BlinkOnOff { period } => {
                pin_state = !pin_state;
                *next_blink.get_or_insert_with(Instant::now) += period / 2;
                print!("blink: ");
            }
        }

        println!("{}", if pin_state { "on" } else { "off" });
    }
}

enum LedState {
    Off,
    On,
    BlinkOnOff { period: Duration },
}

on nightly you can enable #![feature(deadline_api)] and replace

rx.recv_timeout(next_blink.saturating_duration_since(Instant::now()))

with

rx.recv_deadline(next_blink)

Like Masklinn already hinted you can just use the channel itself to signal quitting, that means in turn you don't need ThreadControl but can send LedState directly. I've also gotten rid of keep_running by just immediately breaking out of the loop in that case.

Sign up to request clarification or add additional context in comments.

1 Comment

Thank You! Yes, now it is working! And it's not even using any cpu-time it seems. The only think I don't completely understand is the whole let msg = if let ... {} else {}-block (it's quite dense :D). I think it's blocking /pausing the execution of the thread/loop until the time (next_blink - now) is passed or a new message is received (in case next_blink != None) OR it's simply waiting for a new message. It's also responsible for the low CPU-occupation, isn't it? (While writing this comment I think I have understood it :D - It's really quite clever!!!)
1
    loop {
        rx.iter().for_each

as cafce25 commented this doesn't make any sense: a channel's iterator will block and iterate until the channel is closed, so the outer loop is either dead code (because you've sent a Quit message before closing the channel, so the if will never trigger) or the outer loop is infinite (because you've not sent a Quit before closing the channel, so the flag is set forever).

Just remove the outer iteration, do your things in the inner (now only) iteration, and remove the Quit message: when the producer terminates and drops the sender, that'll close the channel, which will end the iteration. And that means you can just send LedState , ThreadControl is completely unnecessary framing.

Finally the sleep doesn't seem useful, the only situation in which it would do something (after fixing the code so it works) is if the sender produces messages more than once every 5ms, but if that happens your receiver channel is going to fill up with messages because the consumer won't have enough throughput. If you need to rate limit state changes I think it would be a good idea to drop messages during the debouncing window.

Or keep the sleep but then use a rendezvous or a channel with a very small capacity, so you provide backpressure to the producer, there's no reason for it to run at full blast if it's just filling the rest of the program with garbage.

And not sure why you're iterating over the channel using Iterator::for_each, for msg in rx seems plain simpler.

9 Comments

You can't "Just remove the outer iteration", the outer iteration is still needed for "blinking" the LED. Likely OP wants to recv_timeout (or recv_deadline on nightly)
Thank you for that explanation. Does that mean the Led-thread is doing nothing while it is waiting for new messages? I refactored my code a little changing it to for msg in rx { match msg { ... } }. In the match arm for Blink I now run a loop that switches it on and off. But now I cannot break out of it. So I still need a Quit-message, don't I? Alternatively: How do I "run" my led-control in the separate thread, if I should not use a loop there if I cannot Quit it with messages?
thank you @cafce25 I wrote my response before you responded. I moved the blink-loop in a match arm, but now cannot handle new messages.
Thinking about this problem from a completely different angle: Does it even make sense to handle this with message channels or something like Arc<Mutex<LedMessage>>? Could I maybe just use a Rc<LedMessage>, because the Led-Thread only reads the value and updates it's behavior?
@exocortex as noted in my comment, when the producer thread terminates and drops the Sender, it closes the corresponding Receiver, which terminates the iteration.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.