I was hoping to see a nondeterministic interleaving operation for sources, with a type signature like
interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)
The use case is that I have a p2p application that maintains open connections to many nodes on the network, and it is mostly just sitting around waiting for messages from any of them. When a message arrives, it doesn't care where it came from, but needs to process the message as soon as possible. In theory this kind of application (at least when used for socket-like sources) could bypass GHC's IO manager entirely and run the select
/epoll
/etc. calls directly, but I don't particularly care how it's implemented, as long as it works.
Is something like this possible with conduit? A less general but probably more feasible approach might be to write a [(k, Socket)] -> Source m (k, ByteString)
function that handles receiving on all the sockets for you.
I noticed the ResumableSource
operations in conduit, but they all seem to want to be aware of a particular Sink
, which feels like a bit of an abstraction leak, at least for this operation.
select
orepoll
directly or through some package. I don't think there is a polling interface exposed from the event manager (it wouldn't be much of an event manager if it did) so you'd end up with some number of threads and chans with most designs anyways. What I'd do is start with aSource
wrapping a boundedTChan
, forking a thread per connection initially. Move to the event manager if it's necessary for performance. If you're still having problems there is always FFI. – FirecureSource
can "push" values out or whether a sink conceptually is pulling them out. In the former case, the GHC.Event API could easily allow an efficient[(k, Socket)] -> Source m (k, ByteString)
combinator. If they're passive, then we'd need to queue up the results. – KrystlekrystynaSource
pull model. I'd at least start with the simple approach (forkIO/Chan) to set a benchmark and work from there as necessary. – Firecure