0

I'm trying to create async Reader and Writer in tokio, these require Send, and must be thread safe. (does not seem to be a way to write single threaded tokio code that avoids mutexts)

The reader and writer both need to interact, e.g read data might result in a response.

I would like both Reader and Writer to have a thread safe pointer to a Session that can ensure communication between the two.

/// function on the 
impl Session {

pub fn split(&mut self, sock: TcpStream) -> (Reader, Writer) {
    let (read_half, write_half) = sock.split();

    let session = Arc::new(RwLock::new(self)); // <- expected lifetime

    ( Reader::new(session, read_half), Writer::new(Arc::clone(session), write_half) )
}
...
}

pub struct Reader {
    session: Arc<RwLock<&mut StompSession>>,
    read_half: ReadHalf<TcpStream>,
    ...

pub struct Writer {
    session: Arc<RwLock<&mut StompSession>>,
    write_half: WriteHalf<TcpStream>,
    ....

I dont really understand the difference between Arc<RwLock<&mut StompSession>> and Arc<RwLock<StompSession>> both can only be talking about pointers.

Naturally come at this by stuggling with the borrow checker and rust book only has examples for RRwLock with integers, not mutable "objects".

1
  • 1
    Do you know MPSC (or the crossbeam crate)? Commented Oct 10, 2019 at 8:20

1 Answer 1

2

Let's start by clearing a few things:

does not seem to be a way to write single threaded tokio code that avoids mutexes

The Mutex requirement has nothing to do with single-threadedness but with mutable borrows. Whenever you spawn a future, that future is its own entity; it isn't magically part of your struct and most definitely does not know how to keep a &mut self. That is the point of the Mutex - it allows you to dynamically acquire a mutable reference to inner state - and the Arc allows you to have access to the Mutex itself in multiple places.

Their non-synchronized equivalents are Rc and Cell/RefCell, by the way, and their content (whether synchronized or unsynchronized) should be an owned type.

The Send requirement actually shows up when you use futures on top of tokio, as an Executor requires futures spawned on it to be Send (for obvious reasons - you could make a spawn_local method, but that'd cause more problems than it solves).

Now, back to your problem. I'm going to give you the shortest path to an answer to your problem. This, however, will not be completely the right way of doing things; however, since I do not know what protocol you're going to lay on top of TcpStream or what kind of requirements you have, I cannot point you in the right direction (yet). Comments are there for that reason - give me more requirements and I'll happily edit this!

Anyway. Back to the problem. Since Mutex<_> is better used with an owned type, we're going to do that right now and "fix" both your Reader and Writer:

pub struct Reader {
    session: Arc<RwLock<Session>>,
    read_half: ReadHalf<TcpStream>,
}

pub struct Writer {
    session: Arc<RwLock<StompSession>>,
    write_half: WriteHalf<TcpStream>,
}

Since we changed this, we also need to reflect it elsewhere, but to be able to do this we're going to need to consume self. It is fine, however, as we're going to have an Arc<Mutex<_>> copy in either object we return:

impl Session {

    pub fn split(self, sock: TcpStream) -> (Reader, Writer) {
        let (read_half, write_half) = sock.split();

        let session = Arc::new(RwLock::new(self));
        ( Reader::new(session.clone(), read_half), Writer::new(session, write_half) )
    }
}

And lo and behold, it compiles and each Writer/Reader pair now has its own borrowable (mutably and non-mutably) reference to our session!

The playground snippet highlights the changes made. As I said, it works now, but it's going to bite you in the ass the moment you try to do something as you're going to need something on top of both ReadHalf and WriteHalf to be able to use them properly.

Sign up to request clarification or add additional context in comments.

3 Comments

OK that makes sense, I got this to work, with a similar Arc<RwLock>> lock on WriteHalf, read half probably will need the same to force close it. I hope all this locking is fast :) Would not dream of writing synchronized code like that in Java or mutexs in C. BTW protocol is to be STOMP (stomp.github.io)
@teknopaul if for some reason your locking operation ends up blocking, the futures_locks crate contains future-optimized locking primitives. There are also other (non-locking) options providing the same thing; at the end of the day, it is about what constraints you have.
@teknopaul Actually. Something just hit me - what makes you think this is different to java? Mutex is essentially synchronized, since the underlying behavior is the same. Primitive updates can be done lock-free with std::sync::Atomic primitives, which are exactly the same as their java equivalent. The only thing you cannot do in Rust that you can do in java is volatile.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.