I am learning about F# agents (MailboxProcessor
).
I am dealing with a rather unconventional problem.
- I have one agent (
dataSource
) which is a source of streaming data. The data has to be processed by an array of agents (dataProcessor
). We can considerdataProcessor
as some sort of tracking device. - Data may flow in faster than the speed with which the
dataProcessor
may be able to process its input. - It is OK to have some delay. However, I have to ensure that the agent stays on top of its work and does not get piled under obsolete observations
I am exploring ways to deal with this problem.
The first idea is to implement a stack (LIFO) in dataSource
. dataSource
would send over the latest observation available when dataProcessor
becomes available to receive and process the data. This solution may work but it may get complicated as dataProcessor
may need to be blocked and re-activated; and communicate its status to dataSource
, leading to a two way communication problem. This problem may boil down to a blocking queue
in the consumer-producer problem but I am not sure..
The second idea is to have dataProcessor
taking care of message sorting. In this architecture, dataSource
will simply post updates in dataProcessor
's queue. dataProcessor
will use Scan
to fetch the latest data available in his queue. This may be the way to go. However, I am not sure if in the current design of MailboxProcessor
it is possible to clear a queue of messages, deleting the older obsolete ones. Furthermore, here, it is written that:
Unfortunately, the TryScan function in the current version of F# is broken in two ways. Firstly, the whole point is to specify a timeout but the implementation does not actually honor it. Specifically, irrelevant messages reset the timer. Secondly, as with the other Scan function, the message queue is examined under a lock that prevents any other threads from posting for the duration of the scan, which can be an arbitrarily long time. Consequently, the TryScan function itself tends to lock-up concurrent systems and can even introduce deadlocks because the caller's code is evaluated inside the lock (e.g. posting from the function argument to Scan or TryScan can deadlock the agent when the code under the lock blocks waiting to acquire the lock it is already under).
Having the latest observation bounced back may be a problem. The author of this post, @Jon Harrop, suggests that
I managed to architect around it and the resulting architecture was actually better. In essence, I eagerly
Receive
all messages and filter using my own local queue.
This idea is surely worth exploring but, before starting to play around with code, I would welcome some inputs on how I could structure my solution.
Thank you.
TryScan
timeout bug in F# 3.1.1 and it has been fixed. – Prophetic