1

I'm trying to reproduce in Rust what I did in Golang using Clean Architecture with Repository pattern.

The code in this repo is a small reproduction.

It works in both first and second commit.

But when I add a lambda as argument of an async function in the async trait the error on line 60 is:

future cannot be sent between threads safely
the trait `std::marker::Sync` is not implemented for `dyn std::ops::FnMut(&'life3 Player) -> std::result::Result<Player, ()>`
required for the cast to the object type `dyn std::future::Future<Output = std::result::Result<(), ()>> + std::marker::Send`rustc
main.rs(59, 9): captured value is not `Send` because `&` references cannot be sent unless their referent is `Sync`

I do not understand what to do and I don't know if this is a correct pattern.

In Golang this is a very useful (and easy) patter to share in a DB transaction an element between repository code and business logic:

  • the repository function get the player from DB
  • it calls the lambda (or an async anonym function, in Golang there is no closure like in Rust) with this existing player
  • the business logic code works with this existing player in the same DB transaction
  • the business logic code can return the "updated" player to save or an error to the repository code
  • the repository can continue or abort the DB transaction

Is this a correct way to do things in Rust?

If not, what alternatives?

If yes, how to fix this code?

Code

  • main.rs:
use std::sync::{Arc, Mutex};

#[derive(Clone)]
struct Player {
    pub name: String,
    pub payed: bool,
}

#[async_trait::async_trait]
trait RepoMemory {
    async fn player_create(&self, name: &str, payed: bool) -> Result<Player, ()>;

    async fn player_delete(
        &self,
        id: &str,
        // lambda: impl FnMut(&Player) -> Result<Player, ()>,
        lambda: &dyn FnMut(&Player) -> Result<Player, ()>,
    ) -> Result<(), ()>;
}

struct InMemoryRepository {
    pub players: Mutex<Vec<Player>>,
}

impl InMemoryRepository {
    pub fn new() -> Self {
        Self {
            players: Mutex::new(vec![]),
        }
    }
}

#[async_trait::async_trait]
impl RepoMemory for InMemoryRepository {
    async fn player_create(&self, name: &str, payed: bool) -> Result<Player, ()> {
        let mut players = self.players.lock().unwrap();

        if players.iter().any(|player| player.name == name) {
            println!("Player {} already exists!", name);

            return Err(());
        }

        let new_player = Player {
            name: name.to_string(),
            payed,
        };

        players.push(new_player.clone());

        println!("Player {} created", &name);

        Ok(new_player)
    }

    async fn player_delete(
        &self,
        name: &str,
        lambda: &dyn FnMut(&Player) -> Result<Player, ()>,
    ) -> Result<(), ()> {
        let mut players = self.players.lock().unwrap();

        match players.iter().position(|player| player.name == name) {
            Some(index) => {
                let player = players.get(index).unwrap();

                lambda(player)?;

                players.remove(index);

                println!("Player {} deleted", name);

                Ok(())
            }
            None => {
                println!("Cannot find player {}!", name);

                Err(())
            }
        }
    }
}

struct CreateCommand {
    repo: Arc<dyn RepoMemory>,
}

impl CreateCommand {
    pub fn new(repo: Arc<dyn RepoMemory>) -> Self {
        Self { repo }
    }

    pub async fn execute(&self, name: &str, payed: bool) -> Result<Player, ()> {
        let created_player = self.repo.player_create(name, payed).await.unwrap();

        Ok(created_player)
    }
}

struct DeleteCommand {
    repo: Arc<dyn RepoMemory>,
}

impl DeleteCommand {
    pub fn new(repo: Arc<dyn RepoMemory>) -> Self {
        Self { repo }
    }

    #[allow(clippy::result_unit_err)]
    pub async fn execute(&self, name: &str) -> Result<bool, ()> {
        self.repo
            .player_delete(name, &|existing_player| {
                // I need to work with existing_player here, do some .await calls and return it to repo maybe, for example:

                if existing_player.payed {
                    println!("Cannot delete paying player {}!", name);

                    return Err(());
                }

                Ok(existing_player.clone())
            })
            .await?;

        Ok(true)
    }
}

struct Service {
    pub player_create: CreateCommand,
    pub player_delete: DeleteCommand,
}

fn new_service() -> Service {
    let repo_memory = Arc::new(InMemoryRepository::new());

    Service {
        player_create: CreateCommand::new(repo_memory.clone()),
        player_delete: DeleteCommand::new(repo_memory),
    }
}

#[tokio::main]
async fn main() {
    let service = new_service();

    let bob = Player {
        name: "bob".to_owned(),
        payed: true,
    };

    service
        .player_create
        .execute(&bob.name, bob.payed)
        .await
        .unwrap();

    service.player_delete.execute(&bob.name).await.unwrap();

    let john = Player {
        name: "john".to_owned(),
        payed: false,
    };

    service
        .player_create
        .execute(&john.name, john.payed)
        .await
        .unwrap();

    service.player_delete.execute(&john.name).await.unwrap();
}
6
  • Change it to &(dyn FnMut(&Player) -> Result<Player, ()> + Sync). Commented Sep 12, 2022 at 8:47
  • Can you explain a bit more? Commented Sep 12, 2022 at 8:50
  • Explain what to do or why? Commented Sep 12, 2022 at 8:55
  • 2
    Your code doesn't work because &dyn FnMut(&Player) -> Result<Player, ()> may be a non thread-safe closure (even though you're really using a thread-safe closure, you're telling the compiler that you may want to use a non thread-safe closure at some point). Adding + Sync means that you'll only ever use thread safe closures here. Commented Sep 12, 2022 at 10:32
  • 1
    async_trait converts your methods to ones that return Box<dyn Future<...> + Send which makes them usable in multi-threaded executors. (You can opt out of that behavior if you like.) That requires that any argument passed to the method be Send, because the future can be created in one thread and driven to completion in another. As the compiler states, for &dyn ... reference to be Send, the underlying closure must be Sync, which you can ensure by adding the appropriate bound. Commented Sep 12, 2022 at 12:26

1 Answer 1

1

Behind the scenes async_trait converts your methods to ones that return a type-erased virtually dispatched future, Box<dyn Future<Output = ...> + Send. The Send trait makes the created futures usable in multi-threaded executors, which poll the future on whichever thread is currently free, meaning they have to move them from thread to thread. (You can opt out of that bound if you don't use a multi-threaded executor.)

The Send bound requires that any argument passed to the method be Send, because the future can be created in one thread and polled in others. As the compiler states, for &dyn ... reference to be Send, the underlying closure must be Sync, which you can ensure by adding the appropriate bound to the closure you receive. This bound doesn't limit what you can do in the closure in practice, because you will very rarely need a non-Sync closure.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.