I am trying to create a conduit that can consume multiple input streams. I need to be able to await on one or the other of the input streams in no particular order (e.g., not alternating) making zip useless. There is nothing parallel or non-deterministic going on here: I await on one stream or the other. I want to be able to write code similar to the following (where awaitA
and awaitB
await on the first or second input stream respectively):
do
_ <- awaitA
x <- awaitA
y <- awaitB
yield (x,y)
_ <- awaitB
_ <- awaitB
y' <- awaitB
yield (x,y')
The best solution I have is to make the inner monad another conduit, e.g.
foo :: Sink i1 (ConduitM i2 o m) ()
Which then allows
awaitA = await
awaitB = lift await
And this mostly works. Unfortunately, this seems to make it very difficult to fuse to the inner conduit before the outer conduit is fully connected. The first thing I tried was:
fuseInner :: Monad m =>
Conduit i2' m i2 ->
Sink i1 (ConduitM i2 o m) () ->
Sink i1 (ConduitM i2' o m) ()
fuseInner x = transPipe (x =$=)
But this doesn't work, at least when x
is stateful since (x =$=)
is run multiple times, effectively restarting x
each time.
Is there any way to write fuseInner, short of breaking into the internals of conduit (which looks like it would be pretty messy)? Is there some better way to handle multiple input streams? Am I just way to far beyond what conduit was designed for?
Thanks!
IO
input streams. Usestm-conduit
to do this. – Nuskuconduit
version and submit it as an answer. – Nuskufoo
is aSink
over aConduitM
monad). The problem is, I can't figure out how to fuse to the inner conduit with this strategy. – LetteResumableSource
s. For each of the sources, start with(rSource, value) <- source $$+ head
, followed by(rSource', value) <- rSource $$++ head
each time you want to read from that stream. When a stream is exhausted or no longer needed, userSource $$+- return ()
(orhead
if you know this is the last value) to finalize it (perform cleanup operations). Unfortunately, this means that you can't use it with arbitrary conduits; only sources. – Melisandra