Asynchronous barrier in F#
Asked Answered
P

5

6

I wrote a program in F# that asynchronously lists all directories on disk. An async task lists all files in a given directory and creates separate async tasks (daemons: I start them using Async.Start) to list subdirectories. They all communicate the results to the central MailboxProcessor.

My problem is, how do I detect that all the daemon tasks have finished and there will be no more files arriving. Essentially I need a barrier for all tasks that are (direct and indirect) children of my top task. I couldn't find anything like that in the F#'s async model.

What I did instead is to create a separate MailboxProcessor where I register each task's start and termination. When the active count goes to zero, I'm done. But I'm not happy with that solution. Any other suggestions?

Proceeds answered 5/1, 2011 at 20:28 Comment(1)
what is wrong with your solution ? Perhaps you can use one agents who supervises work to do, and lots of agents that request work. The workers get a directory to explore then get the files in coresponding directory, then they post back the results to a supervisor ( not necessarily the same as the first) and posts the subdirectories to explore to the work supervisorTevet
C
8

Have you tried using Async.Parallel? That is, rather than Async.Start each subdirectory, just combine the subdirectory tasks into a single async via Async.Parallel. Then you end up with a (nested) fork-join task that you can RunSynchronously and await the final result.

EDIT

Here is some approximate code, that shows the gist, if not the full detail:

open System.IO

let agent = MailboxProcessor.Start(fun mbox ->
    async {
        while true do
            let! msg = mbox.Receive()
            printfn "%s" msg
    })

let rec traverse dir =
    async {
        agent.Post(dir)
        let subDirs = Directory.EnumerateDirectories(dir)
        return! [for d in subDirs do yield traverse d] 
                 |> Async.Parallel |> Async.Ignore 
    }

traverse "d:\\" |> Async.RunSynchronously
// now all will be traversed, 
// though Post-ed messages to agent may still be in flight

EDIT 2

Here is the waiting version that uses replies:

open System.IO

let agent = MailboxProcessor.Start(fun mbox ->
    async {
        while true do
            let! dir, (replyChannel:AsyncReplyChannel<unit>) = mbox.Receive()
            printfn "%s" dir
            replyChannel.Reply()
    })

let rec traverse dir =
    async {
        let r = agent.PostAndAsyncReply(fun replyChannel -> dir, replyChannel)
        let subDirs = Directory.EnumerateDirectories(dir)
        do! [for d in subDirs do yield traverse d] 
                 |> Async.Parallel |> Async.Ignore 
        do! r // wait for Post to finish
    }

traverse "c:\\Projects\\" |> Async.RunSynchronously
// now all will be traversed to completion 
Cuspid answered 5/1, 2011 at 22:23 Comment(13)
The idea is to start listing as soon as possible and, while doing it and discovering new subdirectories, keep adding (and starting) new tasks. The discovery of new subdirectories is interleaved with the listing of files. There is no good moment to combine all the tasks using Async.Parallel.Proceeds
I don't understand - assuming you have now e.g. "foreach subdir, Async.Start a daemon" change it approximately to "[foreach subdir do yield daemon] |> Async.Parallel" and return the computation that 'kicks everything off'. I can spell out the code in more detail if needed.Cuspid
The thing is that the daemon has its own daemons and so on, depending on the depth of the hierarchy. Imagine that there are no files, just directories. If I understand your solution correctly, you'd have them all listed before you even start your parallel tasks. Is that correct?Proceeds
I don't think so, check out my recent edit of the answer to see if it helps.Cuspid
Your comment "messages may still be in flight" summarizes my concerns. There is not way for the program to know when the listing is complete.Proceeds
Right, to deal with that you could use PostAndAsyncReply instead of Post. E.g. the agent could reply after done processing, and the async inside traverse would make waiting-on-the-reply be one of the things to execute in parallel, so that the whole thing blocks until that is done.Cuspid
@Brain What about putting r in the list passed to Async.Parallel?Zulazulch
This is what I was looking for. Thanks Brian, Eric. My solution had a bottleneck: a single mailbox where all tasks had to register and deregister. Yours avoids this bottleneck by using more resources: it creates an anonymous reply channel for each directory.Proceeds
Is this O(n) space instead of O(1)?Gideon
@Jon, This is O(number of concurrent threads * average sub directories in a directory) space. This could be reduced to O(number of concurrent threads) if a sequence was used instead of a list.Zulazulch
@Jon I looked at the F# source code and I found that Async.Parallel calls Seq.toArray on it's input so using a sequence in the above example won't reduce to O(number of concurrent threads).Zulazulch
@gradbot: Waiting for all children to complete like this is a bad idea because you consume space for every element passed to Async.Parallel until they all complete rather than consuming space just for outstanding workflows. This was one of the design flaws I fixed when updating the web crawler sample from my original F# for Technical Computing (2009) book for the latest Visual F# 2010 for Technical Computing (2010).Gideon
Moreover, the whole exercise is rather futile without an asynchronous function to read directories!Gideon
C
1

You could just use Interlocked to increment and decrement as you begin/end tasks, and be all done when it goes to zero. I've used this strategy in similar code with MailboxProcessors.

Cuspid answered 5/1, 2011 at 20:38 Comment(1)
I was trying to avoid mutation.Proceeds
G
1

You may be better off just using Task.Factory.StartNew() and Task.WaitAll().

Gluteal answered 5/1, 2011 at 23:15 Comment(1)
I guess I could. But that's a C# solution, and my goal is to find out if F# is better/simpler at multitasking. Maybe it just isn't :-(Proceeds
T
1

This is probably a learning exercise, but it seems that you would be happy with a lazy list of all of the files. Stealing from Brian's answer above... (and I think something like this is in all of the F# books, which I don't have with me at home)

open System.IO

let rec traverse dir =
seq {
    let subDirs = Directory.EnumerateDirectories(dir)
    yield dir 
    for d in subDirs do
        yield! traverse d

}

For what it is worth, I have found the Async workflow in F# very useful for "embarrassingly easy" parallel problems, though I haven't tried much general multitasking.

Toothpaste answered 7/1, 2011 at 13:0 Comment(0)
P
0

Just for clarification: I thought there might have been a better solution similar to what one can do in Chapel. There you have a "sync" statement, a barrier that waits for all the tasks spawned within a statement to finish. Here's an example from the Chapel manual:

def concurrentUpdate(tree: Tree) {
    if requiresUpdate(tree) then
        begin update(tree);
    if !tree.isLeaf {
        concurrentUpdate(tree.left);
        concurrentUpdate(tree.right);
    }
}
sync concurrentUpdate(tree);

The "begin" statement creates a task that is run in parallel, somewhat similar to F# "async" block with Async.Start.

Proceeds answered 17/1, 2011 at 18:6 Comment(1)
You could possibly make your own Computation Expression that does this or extend the Async type.Zulazulch

© 2022 - 2024 — McMap. All rights reserved.