A MailboxProcessor that operates with a LIFO logic
Asked Answered
F

3

6

I am learning about F# agents (MailboxProcessor).

I am dealing with a rather unconventional problem.

  • I have one agent (dataSource) which is a source of streaming data. The data has to be processed by an array of agents (dataProcessor). We can consider dataProcessor as some sort of tracking device.
  • Data may flow in faster than the speed with which the dataProcessor may be able to process its input.
  • It is OK to have some delay. However, I have to ensure that the agent stays on top of its work and does not get piled under obsolete observations

I am exploring ways to deal with this problem.

The first idea is to implement a stack (LIFO) in dataSource. dataSource would send over the latest observation available when dataProcessor becomes available to receive and process the data. This solution may work but it may get complicated as dataProcessor may need to be blocked and re-activated; and communicate its status to dataSource, leading to a two way communication problem. This problem may boil down to a blocking queue in the consumer-producer problem but I am not sure..

The second idea is to have dataProcessor taking care of message sorting. In this architecture, dataSource will simply post updates in dataProcessor's queue. dataProcessor will use Scanto fetch the latest data available in his queue. This may be the way to go. However, I am not sure if in the current design of MailboxProcessorit is possible to clear a queue of messages, deleting the older obsolete ones. Furthermore, here, it is written that:

Unfortunately, the TryScan function in the current version of F# is broken in two ways. Firstly, the whole point is to specify a timeout but the implementation does not actually honor it. Specifically, irrelevant messages reset the timer. Secondly, as with the other Scan function, the message queue is examined under a lock that prevents any other threads from posting for the duration of the scan, which can be an arbitrarily long time. Consequently, the TryScan function itself tends to lock-up concurrent systems and can even introduce deadlocks because the caller's code is evaluated inside the lock (e.g. posting from the function argument to Scan or TryScan can deadlock the agent when the code under the lock blocks waiting to acquire the lock it is already under).

Having the latest observation bounced back may be a problem. The author of this post, @Jon Harrop, suggests that

I managed to architect around it and the resulting architecture was actually better. In essence, I eagerly Receive all messages and filter using my own local queue.

This idea is surely worth exploring but, before starting to play around with code, I would welcome some inputs on how I could structure my solution.

Thank you.

Fraze answered 29/1, 2014 at 9:42 Comment(1)
FWIW, I just tested the TryScan timeout bug in F# 3.1.1 and it has been fixed.Prophetic
M
1

tl;dr I would try this: take Mailbox implementation from FSharp.Actor or Zach Bray's blog post, replace ConcurrentQueue by ConcurrentStack (plus add some bounded capacity logic) and use this changed agent as a dispatcher to pass messages from dataSource to an army of dataProcessors implemented as ordinary MBPs or Actors.

tl;dr2 If workers are a scarce and slow resource and we need to process a message that is the latest at the moment when a worker is ready, then it all boils down to an agent with a stack instead of a queue (with some bounded capacity logic) plus a BlockingQueue of workers. Dispatcher dequeues a ready worker, then pops a message from the stack and sends this message to the worker. After the job is done the worker enqueues itself to the queue when becomes ready (e.g. before let! msg = inbox.Receive()). Dispatcher consumer thread then blocks until any worker is ready, while producer thread keeps the bounded stack updated. (bounded stack could be done with an array + offset + size inside a lock, below is too complex one)

Details

MailBoxProcessor is designed to have only one consumer. This is even commented in the source code of MBP here (search for the word 'DRAGONS' :) )

If you post your data to MBP then only one thread could take it from internal queue or stack. In you particular use case I would use ConcurrentStack directly or better wrapped into BlockingCollection:

  • It will allow many concurrent consumers
  • It is very fast and thread safe
  • BlockingCollection has BoundedCapacity property that allows you to limit the size of a collection. It throws on Add, but you could catch it or use TryAdd. If A is a main stack and B is a standby, then TryAdd to A, on false Add to B and swap the two with Interlocked.Exchange, then process needed messages in A, clear it, make a new standby - or use three stacks if processing A could be longer than B could become full again; in this way you do not block and do not lose any messages, but could discard unneeded ones is a controlled way.

BlockingCollection has methods like AddToAny/TakeFromAny, which work on an arrays of BlockingCollections. This could help, e.g.:

  • dataSource produces messages to a BlockingCollection with ConcurrentStack implementation (BCCS)
  • another thread consumes messages from BCCS and sends them to an array of processing BCCSs. You said that there is a lot of data. You may sacrifice one thread to be blocking and dispatching your messages indefinitely
  • each processing agent has its own BCCS or implemented as an Agent/Actor/MBP to which the dispatcher posts messages. In your case you need to send a message to only one processorAgent, so you may store processing agents in a circular buffer to always dispatch a message to least recently used processor.

Something like this:

            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)

Instead of ConcurrentStack, you may want to read about heap data structure. If you need your latest messages by some property of messages, e.g. timestamp, rather than by the order in which they arrive to the stack (e.g. if there could be delays in transit and arrival order <> creation order), you can get the latest message by using heap.

If you still need Agents semantics/API, you could read several sources in addition to Dave's links, and somehow adopt implementation to multiple concurrent consumers:

  • An interesting article by Zach Bray on efficient Actors implementation. There you do need to replace (under the comment // Might want to schedule this call on another thread.) the line execute true by a line async { execute true } |> Async.Start or similar, because otherwise producing thread will be consuming thread - not good for a single fast producer. However, for a dispatcher like described above this is exactly what needed.

  • FSharp.Actor (aka Fakka) development branch and FSharp MPB source code (first link above) here could be very useful for implementation details. FSharp.Actors library has been in a freeze for several months but there is some activity in dev branch.

  • Should not miss discussion about Fakka in Google Groups in this context.

I have a somewhat similar use case and for the last two days I have researched everything I could find on the F# Agents/Actors. This answer is a kind of TODO for myself to try these ideas, of which half were born during writing it.

Mycobacterium answered 30/1, 2014 at 0:7 Comment(8)
Many thanks for the input. I have a question: if the consumers are implemented as ordinary MBPs, due to indeterminacy in the delivery of messages coming from the dispatcher, they may still get started and process observations that are not the latest available, isn't it? In my design, I should try to ensure that processors are put to work on observations that are as fresh as possible.Fraze
In this design the dispatcher ensures that the latest available message is always sent first. However, dispatcher will be pretty fast to Post messages, and workers could accumulate old values... I should correct the text, it doesn't achieve your goal as written initially. If you cannot drop any message, but have to process the latest first, then workers should be implemented like dispatcher with stack instead queue. If you can drop non-latest messages, then do work directly in dispatcher (where ensure that Post and Receive are on different threads if you use implementation from the blog post).Mycobacterium
... I do not understand completely your use case. In my text there is probably no actionable answer to your question, but some building blocks and links to other useful ones.Mycobacterium
Yes, I understand. I am new to F# and I have a very tiny experience with actors. I got great food for thoughts from you and @7sharp9. Your idea of using a heap is also very tempting. I'll try to work on something and experiment. Hope we can stay in touch through this, I'd be happy to see how you proceed with the implementation.Fraze
I am not an expert as well and learning it the hard way :) After rereading you question, another thought: if the number of workers is fixed and you need to start a worker with the latest message as soon as a worker awailable, you could use a SemaphoreSlim in dispatcher with the limit equal to number of workers plus some logic to pass a message to the released workerMycobacterium
Hi, I am trying to implement an agent class testing an idea I had. My Post function works. Now it's time for me to work on PostAndReply. I had a look at #ZachBray code and his PostAndReply function is not general enough. On the other hand, @Malikamalin code where he uses an ad-hoc implementation based on TaskCompletionSource. Are there alternatives to this approach? Is the implementation of the original MailBoxProcessor available out there? Thanks!Fraze
The first link, with the Dragons :) FSharp.Actor has slightly different but general implementation. Also check out this: github.com/rogeralsing/Pigeon, it's development is very active right nowMycobacterium
I thought that the F# powerpack was an add-on library to the standard core. I'll study the material. Thanks a lot.Fraze
M
2

Sounds like you might need a destructive scan version of the mailbox processor, I implemented this with TPL Dataflow in a blog series that you might be interested in.

My blog is currently down for maintenance but I can point you to the posts in markdown format.

Part1
Part2
Part3

You can also check out the code on github

I also wrote about the issues with scan in my lurking horror post

Hope that helps...

Malikamalin answered 29/1, 2014 at 14:30 Comment(3)
Hi. It is a really interesting read. I'll go through the material. Btw, I have a problem with the formatting of {% codeblock lang:fsharp %} and other tags. Is there a trick to get the proper layout? Many thanks.Fraze
Its formatted with a liquid plugin, github just shows it with the standard markup processor. Ill be moving them to my current blog, just not had the time to do it yet. You can find parts 1 and 2 on wayback machine too: web.archive.org/web/20131126185321/http://moiraesoftware.com/… web.archive.org/web/20131127003757/http://moiraesoftware.com/…Malikamalin
@Malikamalin Thanks for the direct readable links! Yesterday I tried Google cache :) I tested DataFlowAgent on the same test as in Zach Bray article and DFA is ~4 times slower than standard MBP on my machine. It spends 72% on DataflowBlock.ReceiveAsync/DataflowBlock.Post, which are inside System.Threading.Tasks.Dataflow, and this exactly explains 4x drop in speed. Is STTD slow or what? The snippet here: fssnip.net/ltMycobacterium
P
2

The simplest solution is to greedily eat all messages in the inbox when one arrives and discard all but the most recent. Easily done using TryReceive:

let rec readLatestLoop oldMsg =
  async { let! newMsg = inbox.TryReceive 0
          match newMsg with
          | None -> oldMsg
          | Some newMsg -> return! readLatestLoop newMsg }
let readLatest() =
  async { let! msg = inbox.Receive()
          return! readLatestLoop msg }

When faced with the same problem I architected a more sophisticated and efficient solution I called cancellable streaming and described in in an F# Journal article here. The idea is to start processing messages and then cancel that processing if they are superceded. This significantly improves concurrency if significant processing is being done.

Prophetic answered 4/2, 2014 at 16:10 Comment(0)
M
1

tl;dr I would try this: take Mailbox implementation from FSharp.Actor or Zach Bray's blog post, replace ConcurrentQueue by ConcurrentStack (plus add some bounded capacity logic) and use this changed agent as a dispatcher to pass messages from dataSource to an army of dataProcessors implemented as ordinary MBPs or Actors.

tl;dr2 If workers are a scarce and slow resource and we need to process a message that is the latest at the moment when a worker is ready, then it all boils down to an agent with a stack instead of a queue (with some bounded capacity logic) plus a BlockingQueue of workers. Dispatcher dequeues a ready worker, then pops a message from the stack and sends this message to the worker. After the job is done the worker enqueues itself to the queue when becomes ready (e.g. before let! msg = inbox.Receive()). Dispatcher consumer thread then blocks until any worker is ready, while producer thread keeps the bounded stack updated. (bounded stack could be done with an array + offset + size inside a lock, below is too complex one)

Details

MailBoxProcessor is designed to have only one consumer. This is even commented in the source code of MBP here (search for the word 'DRAGONS' :) )

If you post your data to MBP then only one thread could take it from internal queue or stack. In you particular use case I would use ConcurrentStack directly or better wrapped into BlockingCollection:

  • It will allow many concurrent consumers
  • It is very fast and thread safe
  • BlockingCollection has BoundedCapacity property that allows you to limit the size of a collection. It throws on Add, but you could catch it or use TryAdd. If A is a main stack and B is a standby, then TryAdd to A, on false Add to B and swap the two with Interlocked.Exchange, then process needed messages in A, clear it, make a new standby - or use three stacks if processing A could be longer than B could become full again; in this way you do not block and do not lose any messages, but could discard unneeded ones is a controlled way.

BlockingCollection has methods like AddToAny/TakeFromAny, which work on an arrays of BlockingCollections. This could help, e.g.:

  • dataSource produces messages to a BlockingCollection with ConcurrentStack implementation (BCCS)
  • another thread consumes messages from BCCS and sends them to an array of processing BCCSs. You said that there is a lot of data. You may sacrifice one thread to be blocking and dispatching your messages indefinitely
  • each processing agent has its own BCCS or implemented as an Agent/Actor/MBP to which the dispatcher posts messages. In your case you need to send a message to only one processorAgent, so you may store processing agents in a circular buffer to always dispatch a message to least recently used processor.

Something like this:

            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)

Instead of ConcurrentStack, you may want to read about heap data structure. If you need your latest messages by some property of messages, e.g. timestamp, rather than by the order in which they arrive to the stack (e.g. if there could be delays in transit and arrival order <> creation order), you can get the latest message by using heap.

If you still need Agents semantics/API, you could read several sources in addition to Dave's links, and somehow adopt implementation to multiple concurrent consumers:

  • An interesting article by Zach Bray on efficient Actors implementation. There you do need to replace (under the comment // Might want to schedule this call on another thread.) the line execute true by a line async { execute true } |> Async.Start or similar, because otherwise producing thread will be consuming thread - not good for a single fast producer. However, for a dispatcher like described above this is exactly what needed.

  • FSharp.Actor (aka Fakka) development branch and FSharp MPB source code (first link above) here could be very useful for implementation details. FSharp.Actors library has been in a freeze for several months but there is some activity in dev branch.

  • Should not miss discussion about Fakka in Google Groups in this context.

I have a somewhat similar use case and for the last two days I have researched everything I could find on the F# Agents/Actors. This answer is a kind of TODO for myself to try these ideas, of which half were born during writing it.

Mycobacterium answered 30/1, 2014 at 0:7 Comment(8)
Many thanks for the input. I have a question: if the consumers are implemented as ordinary MBPs, due to indeterminacy in the delivery of messages coming from the dispatcher, they may still get started and process observations that are not the latest available, isn't it? In my design, I should try to ensure that processors are put to work on observations that are as fresh as possible.Fraze
In this design the dispatcher ensures that the latest available message is always sent first. However, dispatcher will be pretty fast to Post messages, and workers could accumulate old values... I should correct the text, it doesn't achieve your goal as written initially. If you cannot drop any message, but have to process the latest first, then workers should be implemented like dispatcher with stack instead queue. If you can drop non-latest messages, then do work directly in dispatcher (where ensure that Post and Receive are on different threads if you use implementation from the blog post).Mycobacterium
... I do not understand completely your use case. In my text there is probably no actionable answer to your question, but some building blocks and links to other useful ones.Mycobacterium
Yes, I understand. I am new to F# and I have a very tiny experience with actors. I got great food for thoughts from you and @7sharp9. Your idea of using a heap is also very tempting. I'll try to work on something and experiment. Hope we can stay in touch through this, I'd be happy to see how you proceed with the implementation.Fraze
I am not an expert as well and learning it the hard way :) After rereading you question, another thought: if the number of workers is fixed and you need to start a worker with the latest message as soon as a worker awailable, you could use a SemaphoreSlim in dispatcher with the limit equal to number of workers plus some logic to pass a message to the released workerMycobacterium
Hi, I am trying to implement an agent class testing an idea I had. My Post function works. Now it's time for me to work on PostAndReply. I had a look at #ZachBray code and his PostAndReply function is not general enough. On the other hand, @Malikamalin code where he uses an ad-hoc implementation based on TaskCompletionSource. Are there alternatives to this approach? Is the implementation of the original MailBoxProcessor available out there? Thanks!Fraze
The first link, with the Dragons :) FSharp.Actor has slightly different but general implementation. Also check out this: github.com/rogeralsing/Pigeon, it's development is very active right nowMycobacterium
I thought that the F# powerpack was an add-on library to the standard core. I'll study the material. Thanks a lot.Fraze

© 2022 - 2024 — McMap. All rights reserved.