I'm trying to build a streaming library using the abstractions described in the paper "Faster coroutine pipelines". I've modified the code so that it correctly handles pipeline exiting (instead of throwing out errors when that happens):
-- | r: return type of the continuation, i: input stream type, o: output stream type,
-- m: underlying monad, a: return type
newtype ContPipe r i o m a = MakePipe {runPipe :: (a -> Result r m i o) -> Result r m i o}
deriving
( Functor,
Applicative,
Monad
)
via (Cont (Result r m i o))
type Result r m i o = InCont r m i -> OutCont r m o -> m r
newtype InCont r m i = MakeInCont {resumeIn :: OutCont r m i -> m r}
newtype OutCont r m o = MakeOutCont {resumeOut :: Maybe o -> InCont r m o -> m r}
suspendIn :: Result r m i o -> InCont r m i -> InCont r m o
suspendIn k ik = MakeInCont \ok -> k ik ok
suspendOut :: (Maybe i -> Result r m i o) -> OutCont r m o -> OutCont r m i
suspendOut k ok = MakeOutCont \v ik -> k v ik ok
emptyIk :: InCont r m a
emptyIk = MakeInCont \ok -> resumeOut ok Nothing emptyIk
await :: ContPipe r i o m (Maybe i)
await = MakePipe \k ik ok -> resumeIn ik (suspendOut k ok)
yield :: o -> ContPipe r i o m ()
yield v = MakePipe \k ik ok -> resumeOut ok (Just v) (suspendIn (k ()) ik)
(.|) :: forall r i e o m a. ContPipe r i e m () -> ContPipe r e o m a -> ContPipe r i o m a
p .| q = MakePipe \k ik ok ->
runPipe
q
(\a _ ok' -> k a emptyIk ok')
(suspendIn (runPipe p (\() -> f)) ik)
ok
where
f :: Result r m i e
f _ ok = resumeOut ok Nothing emptyIk
runContPipe :: forall m a. Applicative m => ContPipe a () Void m a -> m a
runContPipe p = runPipe p (\a _ _ -> pure a) ik ok
where
ik :: InCont a m ()
ik = MakeInCont \ok' -> resumeOut ok' (Just ()) ik
ok :: OutCont a m Void
ok = MakeOutCont \_ ik' -> resumeIn ik' ok
I would like to implement a function
fork :: ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
That combines two consumer streams into one (similar to conduit's ZipSink
). It should have the following semantics:
- If both streams haven't exited and are accepting inputs, feed the same input value to both streams
- If one stream has exited, store the return value then feed the input into the stream that's accepting the value
- If both streams have exited, exit with the return value of both streams put into a tuple.
Here's my attempt:
We reuse the loop
function in the paper that connects an InCont r m i
to two OutCont r m i
and actively resumes the continuations.
loop :: InCont r m i -> OutCont r m i -> OutCont r m i -> m r
loop ik ok1 ok2 =
resumeIn ik $ MakeOutCont \v ik' ->
resumeOut ok1 v $ MakeInCont \ok1' ->
resumeOut ok2 v $ MakeInCont \ok2' -> loop ik' ok1' ok2'
With loop
we can connect the input of the resulting pipe into the two pipes simultaneously, the output will be shared between the two pipes (it doesn't really matter since you cannot yield a Void
).
fork :: forall r m i a b. ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
MakePipe \k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = _
g :: b -> Result r m i Void
g b ik' ok' = _
in runPipe
p
f
(MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
ok
Now we just need to fill in the continuations f
and g
which will be called by p
and q
when they exit.
If g
has already been called when f
was called, which means q
has exited, then f
should call the continuation k
, if g
hasn't been called yet, then f
should store the return value a
and resume the input continuation (by discarding all of the values passed)
It seems to me that it's not possible to achieve this without some form of shared state. And we could try to store the state in m
using a state monad:
fork :: forall r m i a b. MonadState (Maybe (Either a b)) m => ContPipe r i Void m a -> ContPipe r i Void m b -> ContPipe r i Void m (a, b)
fork p q =
MakePipe \k ik ok ->
let f :: a -> Result r m i Void
f a ik' ok' = do
s <- get
case s of
Nothing -> do
put (Just (Left a))
resumeIn ik' sinkOk
Just (Right b) -> do
k (a, b) ik' ok'
_ -> error "unexpected state"
g :: b -> Result r m i Void
g b ik' ok' = do
s <- get
case s of
Nothing -> do
put (Just (Right b))
resumeIn ik' sinkOk
Just (Left a) -> do
k (a, b) ik' ok'
_ -> error "unexpected state"
in runPipe
p
f
(MakeInCont \ok1 -> runPipe q g (MakeInCont \ok2 -> loop ik ok1 ok2) ok)
ok
sinkOk
is an output continuation that discards all of its inputs:
sinkOk :: OutCont r m o
sinkOk = MakeOutCont \_ ik -> resumeIn ik sinkOk
we could now add some auxiliary functions for testing:
print' :: MonadIO m => Show i => ContPipe r i o m ()
print' = do
m <- await
case m of
Nothing -> pure ()
Just i -> do
lift $ liftIO (print i)
print'
upfrom :: Int -> ContPipe r i Int m a
upfrom i = do
yield i
upfrom (i + 1)
take' :: Int -> ContPipe r i i m ()
take' n
| n <= 0 = pure ()
| otherwise = do
m <- await
case m of
Nothing -> pure ()
Just i -> do
yield i
take' (n - 1)
This does work in the case where p
exits earlier than q
:
flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' print'
gives the desired output:
1
1
2
2
3
3
((),())
But it goes into infinite loops when q
exits earlier than p
:
flip evalStateT Nothing $ runContPipe $ upfrom 1 .| take' 3 .| fork print' (take 2 print')
outputs:
1
1
2
2
<loops>
fork
receives two "empty" sources that never produce anything? – Penny.|
works is that if the upstream exited early, replace the upstream with a source that only spits outNothing
s (emptyIk
). – Jillion