Nondeterministically interleaving conduit's Sources
Asked Answered
K

2

5

I was hoping to see a nondeterministic interleaving operation for sources, with a type signature like

interleave :: WhateverIOMonadClassItWouldWant m => [(k, Source m a)] -> Source m (k, a)

The use case is that I have a p2p application that maintains open connections to many nodes on the network, and it is mostly just sitting around waiting for messages from any of them. When a message arrives, it doesn't care where it came from, but needs to process the message as soon as possible. In theory this kind of application (at least when used for socket-like sources) could bypass GHC's IO manager entirely and run the select/epoll/etc. calls directly, but I don't particularly care how it's implemented, as long as it works.

Is something like this possible with conduit? A less general but probably more feasible approach might be to write a [(k, Socket)] -> Source m (k, ByteString) function that handles receiving on all the sockets for you.

I noticed the ResumableSource operations in conduit, but they all seem to want to be aware of a particular Sink, which feels like a bit of an abstraction leak, at least for this operation.

Krystlekrystyna answered 14/7, 2012 at 6:58 Comment(5)
How much faster perf do you need than the simple solution using forkIO and a Source wrapping a Chan/TChan?Firecure
I don't necessarily need high performance (I anticipate connecting to around 1000 of these peers at the most) but I'm also just generally interested in these iteratee-like abstractions and was wondering whether it was possible to provide this kind of operation with conduits. It seems like a reasonably common use case for certain kinds of network protocols.Krystlekrystyna
I believe the event manager is about as close as you'll get without using select or epoll directly or through some package. I don't think there is a polling interface exposed from the event manager (it wouldn't be much of an event manager if it did) so you'd end up with some number of threads and chans with most designs anyways. What I'd do is start with a Source wrapping a bounded TChan, forking a thread per connection initially. Move to the event manager if it's necessary for performance. If you're still having problems there is always FFI.Firecure
That would work, but if we wanted to go for an approach that avoided the overhead of GHC threads and chans, the GHC event manager does have an API at haskell.org/ghc/docs/latest/html/libraries/base/GHC-Event.html. The real question becomes whether a conduit Source can "push" values out or whether a sink conceptually is pulling them out. In the former case, the GHC.Event API could easily allow an efficient [(k, Socket)] -> Source m (k, ByteString) combinator. If they're passive, then we'd need to queue up the results.Krystlekrystyna
If you need to avoid threads and callbacks you're probably going to have to use a poll loop to keep with the Source pull model. I'd at least start with the simple approach (forkIO/Chan) to set a benchmark and work from there as necessary.Firecure
S
5

The stm-conduit package provides the mergeSources which performs something similar- though not identical- to what you're looking for. It's probably a good place to start.

Swivet answered 14/7, 2012 at 18:45 Comment(5)
Accept Michael's answer. It does what I described.Selfcontent
Could anyone elaborate on why the type of mergeSources constrains the monad argument of the Sources it deals with to be of the form ResourceT m instead of any MonadResource instance?Emir
I can't see a reason, it's worth filing an issue with the maintainer. You can likely fix this yourself when calling mergeSources by using hoist liftResourceT.Swivet
Thanks @MichaelSnoyman I created an issue here: github.com/cgaebel/stm-conduit/issues/25 I also was able to figure out an appropriate wrapper following your suggestion that changes the entire type to not explicitly mention ResourceT, which meets my needs.Emir
This is now implemented in stm-conduit 2.5.1.Emir
S
3

Yes, it is possible.

You can poll a bunch of Sources without blocking by forking threads to poll where in each thread you pair the Source up with a Sink that sends the output to some concurrency channel:

concur :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Sink a m r

... and then you define a Source that reads from that channel:

synchronize :: (WhateverIOMonadClassItWouldWant m) => TChan a -> Source a m r

Notice that this would be no different than just forking the threads to poll the sockets themselves, but it would be useful to other users of conduit that might want to poll other things than sockets using Sources they defined because it's more general.

If you combined those capabilities into one function, then the overall API of the call would look something like:

poll :: (WhateverIOMonadClassItWouldWant m) => [Source a m r] -> m (Source a m r)

... but you can still throw in those ks if you want.

Selfcontent answered 14/7, 2012 at 14:0 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.