1

I have a question concerning data structures that contain async operations. It may sound weird.

TestActor contains a MailBoxProcessor and has three functions: Receive prepares the mailbox processor to receive messages Post and PostAndAsyncReply are used to send messages to the actor.

type TestActor (init, timeout) =
    let mutable counter = init
    let rcvFun = fun (msg) -> async {
            match msg with
            | Add i -> 
                counter <- counter + i
            | GetCounter reply -> 
                reply.Reply counter}
    do printfn "Initializing actors: "
    do mailbox.Receive (rcvFun, timeout) ////// RECEIVE IS CALLED AT CONSTRUCTION

    let mailbox = OnlyLatestMBP<TestMessage> ()

    member x.Receive (timeout) =       
        mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
        mailbox.Post(msg, timeout)

    member x.PostAndAsyncReply (replyChannel, timeout) = 
        mailbox.PostAndAsyncReply(replyChannel, timeout)

I'd like to use this example to understand an issue that affected my code. In the usual example for stacking agents in a data structure, Receive is executed at construction. In my example, the agent could be tested test with the code below:

let actorsWorkforce = 
    seq { 1 .. 5}
    |> Seq.map (fun idx -> TestActor(idx, 60000))

let test = 
    actorsWorkforce 
    |> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )
    |> Async.Parallel
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element ->
        match element with
        | Some x -> printfn "Actor %i: OK with result %A" idx x
        | None -> printfn "Actor %i: Failed" idx )

And this works as planned.

However, let's say I'd like to postpone the call to Receive to a later stage.

type TestActor (init) = 
    let mutable counter = init
    let rcvFun = fun (msg) -> async {
            match msg with
            | Add i -> 
                counter <- counter + i
            | GetCounter reply -> 
                reply.Reply counter}

    let mailbox = OnlyLatestMBP<TestMessage> ()

    member x.Receive (timeout) =       
        mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
        mailbox.Post(msg, timeout)

    member x.PostAndAsyncReply (replyChannel, timeout) = 
        mailbox.PostAndAsyncReply(replyChannel, timeout)


let actorsWorkforce = 
    seq { 1 .. 5}
    |> Seq.map (fun idx -> TestActor(idx))

actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000))

let test = 
    actorsWorkforce 
    |> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )

    |> Async.Parallel
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element ->
        match element with
        | Some x -> printfn "Actor %i: OK with result %A" idx x
        | None -> printfn "Actor %i: Failed" idx )

This piece of code compiles but does not work. mailbox.Receive has the type signature member Receive : callback:('a -> Async<unit>) * ?timeout:int -> unit so it would make sense to execute Receive with Seq.iter. I suspect that the code does not work because actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000)) duplicates actorsWorkforce when executed.

Is this correct? How can I fix this? Thanks!

EDIT

The entire code:

open System
open System.Diagnostics
open Microsoft.FSharp.Control
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// OnlyLatest
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

type Envelope<'a> = Option<DateTime * 'a> 

[<Sealed>]
type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) =
    member x.Reply(reply) = replyf(reply)


[<Sealed>]
type OnlyLatestMBP<'a> () =

    let mutable currentEnvelope: Envelope<'a> = Envelope<'a>.None
    let mutable timestampLastPrcsd: DateTime = DateTime.Now
    let mutable react = Unchecked.defaultof<_>

    // Msg Box status
    let mutable isActive = false
    let mutable defaultTimeout = Timeout.Infinite

    // Event Messages
    let awaitMsg = new AutoResetEvent(false)
    let isActiveMsg = new AutoResetEvent(false) 

    let rec await timeout = async {
        let thr = Thread.CurrentThread.ManagedThreadId
        printfn "await on thread %i" thr
        match currentEnvelope with
        | Some (timestamp, x) -> 
            if timestamp > timestampLastPrcsd then
                do! react x
                timestampLastPrcsd <- timestamp               
                printfn "processed message"
            currentEnvelope <- Envelope.None
            awaitMsg.Reset() |> ignore
            return! await timeout
        | None -> 
            let! recd = Async.AwaitWaitHandle(awaitMsg, timeout)
            if recd
            then return! await timeout    
            else  
                isActive <- false
                isActiveMsg.Reset() |> ignore
                printfn ".. no message within timeout, shutting down"  }

    member x.DefaultTimeout
        with get() = defaultTimeout
        and set(value) = defaultTimeout <- value

    member x.Receive (callback, ?timeout) = 
        if not isActive then 
            isActive <- true
            isActiveMsg.Set() |> ignore
        let timeout = defaultArg timeout defaultTimeout  
        react <- callback 
        let todo = await timeout
        Async.Start todo  

    member x.Post (msg, ?timeout) = async {
        let thr = Thread.CurrentThread.ManagedThreadId
        printfn "posting on thread %i" thr
        let timeout = defaultArg timeout defaultTimeout
        if not isActive then 
            let! recd = Async.AwaitWaitHandle(isActiveMsg, timeout)
            if recd then 
                currentEnvelope <- Envelope.Some(DateTime.Now, msg)
                awaitMsg.Set() |> ignore       
                return true
            else return false
        else             
            currentEnvelope <- Envelope.Some(DateTime.Now, msg) 
            awaitMsg.Set() |> ignore  
            return true  }

    member x.PostAndAsyncReply (replyChannelMsg, ?timeout) = async {
        let timeout = defaultArg timeout defaultTimeout
        let tcs = new TaskCompletionSource<_>()  
        let msg = replyChannelMsg ( new AsyncReplyChannel<_> (fun reply -> tcs.SetResult(reply)) )
        let! posted = x.Post (msg,timeout)
        if posted then
            match timeout with
            | Timeout.Infinite -> 
                let! result = Async.FromContinuations ( fun (cont, _, _) ->
                    let apply = fun (task: Task<_>) -> cont (task.Result)
                    tcs.Task.ContinueWith(apply) |> ignore )
                return Some result 
            | _ ->
                let waithandle = tcs.Task.Wait(timeout)
                match waithandle with
                | false -> return None
                | true -> return Some tcs.Task.Result 
        else return None }



type TestMessage =
   | Add of int
   | GetCounter of AsyncReplyChannel<int>


type TestActor (init) =
    let mutable counter = init
    let rcvFun = fun (msg) -> async {
            match msg with
            | Add i -> 
                counter <- counter + i
            | GetCounter reply -> 
                reply.Reply counter}


    let mailbox = OnlyLatestMBP<TestMessage> ()
//    do printfn "Initializing actors: "
//    do mailbox.Receive (rcvFun, timeout)

    member x.Receive (timeout) =       
        mailbox.Receive (rcvFun, timeout) 

    member x.Post (msg: TestMessage, timeout) = 
        mailbox.Post(msg, timeout)

    member x.PostAndAsyncReply (replyChannel, timeout) = 
        mailbox.PostAndAsyncReply(replyChannel, timeout)




let actorsWorkforce = 
    seq { 1 .. 5}
    |> Seq.map (fun idx -> TestActor(idx))


actorsWorkforce |> Seq.iter (fun actor -> actor.Receive (60000)) 


let test = 
    actorsWorkforce 
    |> Seq.map ( fun idx -> idx.PostAndAsyncReply ( (fun reply -> GetCounter reply), 10000) )
    |> Async.Parallel
    |> Async.RunSynchronously 


let result = 
    test 
    |> Array.iteri (fun idx element ->
        match element with
        | Some x -> printfn "Actor %i: OK with result %A" idx x
        | None -> printfn "Actor %i: Failed" idx )
5
  • This code isn't complete - definitions for OnlyLatestMBP and TestMessage are missing. Although I guess that changing to let actorsWorkforce() ... might work. Commented Feb 11, 2014 at 9:28
  • Tnx for looking into this. I've added the complete code. The question is basically how to apply Receive to a data structure of actors. Commented Feb 11, 2014 at 18:30
  • Wow, this is a lot of code! Commented Feb 12, 2014 at 0:13
  • @NoIdeaHowToFixThis - What we really want is a simple minimal example that demonstrates the problem. It is quite likely that much of the code you have shown is not required to show the problem. See sscce.org for some guidance. Commented Feb 12, 2014 at 1:58
  • @JohnPalmer - Well, most of the code is just boilerplate to start, communicate and shut down the agent. Anyhow, I just figured out what was the problem. Thanks for your help. Commented Feb 12, 2014 at 9:33

1 Answer 1

1

As initially suspected, the issue was indeed with: actorsWorkforce |> Seq.iter (fun idx -> idx.Receive (60000))

The problem was due to the lazy nature of seq

I have produced a narrowed down minimal code example.

open System
open System.Diagnostics
open Microsoft.FSharp.Control
open System.Threading
open System.Threading.Tasks
open System.Collections.Concurrent

type TestActress (name, timerlength) = 
    let mutable isActive = false
    let rec myTask () = async {
        Thread.Sleep (timerlength * 1000)
        printfn "%s waited : %i" name timerlength
        return! myTask () }
    member this.Start () = 
        isActive <- true
        Async.Start (myTask ())
    member this.GetStatus () = async {
        Thread.Sleep (2000)
        return  isActive }

// One single element, this is easy
let cameronDiaz = TestActress ("Cameron", 10)
cameronDiaz.Start ()
let status = cameronDiaz.GetStatus () |> Async.RunSynchronously     

// Async.Parallel receives a seq<Async<'T>> as an input
// This is why I started off with a seq
// This piece of code does not work
let hollywood = 
    [ "Cameron"; "Pamela"; "Natalie"; "Diane" ] 
    |> List.toSeq
    |> Seq.mapi ( fun idx el -> TestActress (el, idx + 10) )
hollywood |> Seq.iter ( fun el -> el.Start () ) 
let areTheyWorking =
    hollywood
    |> Seq.map (fun el -> el.GetStatus ()) 
    |> Async.Parallel
    |> Async.RunSynchronously

// Allright, with a list I get the function executed when I expect them to
let hollywood2 = 
    [ "Cameron"; "Pamela"; "Natalie"; "Diane" ] 
    |> List.mapi ( fun idx el -> TestActress (el, idx + 10) )
hollywood2 |> List.iter ( fun el -> el.Start () ) 
let areTheyWorking2 =
    hollywood2
    |> List.map (fun el -> el.GetStatus ()) 
    |> Async.Parallel
    |> Async.RunSynchronously
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.