I am surprised by a stack overflow in my async-based program. I suspect the main problem is with the following function, which is supposed to compose two async computations to execute in parallel and wait for both to finish:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
async {
let! x = Async.StartChild a
let! y = Async.StartChild b
do! x
do! y
}
With this defined, I have the following mapReduce program that attempts to exploit parallelism in both the map and the reduce part. Informally, the idea is to spark N mappers and N-1 reducers using a shared channel, wait for them to finish, and read the result from the channel. I had my own Channel implementation, here replaced by a ConcurrentBag for shorter code (the problem affects both):
let mapReduce (map : 'T1 -> Async<'T2>)
(reduce : 'T2 -> 'T2 -> Async<'T2>)
(input : seq<'T1>) : Async<'T2> =
let bag = System.Collections.Concurrent.ConcurrentBag()
let rec read () =
async {
match bag.TryTake() with
| true, value -> return value
| _ -> do! Async.Sleep 100
return! read ()
}
let write x =
bag.Add x
async.Return ()
let reducer =
async {
let! x = read ()
let! y = read ()
let! r = reduce x y
return bag.Add r
}
let work =
input
|> Seq.map (fun x -> async.Bind(map x, write))
|> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer)
async {
do! work
return! read ()
}
Now the following basic test starts to throw StackOverflowException on n=10000:
let test n =
let map x = async.Return x
let reduce x y = async.Return (x + y)
mapReduce map reduce [0..n]
|> Async.RunSynchronously
EDIT: An alternative implementation of the <|> combinator makes the test succeed on N=10000:
let ( <|> ) (a: Async<unit>) (b: Async<unit>) =
Async.FromContinuations(fun (ok, _, _) ->
let count = ref 0
let ok () =
lock count (fun () ->
match !count with
| 0 -> incr count
| _ -> ok ())
Async.Start <|
async {
do! a
return ok ()
}
Async.Start <|
async {
do! b
return ok ()
})
This is really surprising to me because this is what I assumed Async.StartChild is doing. Any thoughts on which solution would be optimal?