Data.Vector.Stream
provides an nice Stream
implementation that is very efficient thanks to the focus on fusability (see this paper for more). Since vector-0.1
this Stream
implementation changed slightly by moving the Step
type into a monad. (Now, the implementation is located in Data.Vector.Fusion.Stream.Monadic
.)
In a nutshell, here's the definition of Stream
:
data Step s a where
Yield :: a -> s -> Step s a
Skip :: s -> Step s a
Done :: Step s a
data Stream a = forall s. Stream (s -> Step s a) s
Step s a
encapsulates the three possible results from one iteration of state s
with update function s -> Step s a
. The stream is either Done
, or skips output, or yields output. (The definition above uses GADTs, but that's not relevant, here.)
Simple applications of this Stream
are:
empty :: Stream a
empty = Stream (const Done) ()
singleton :: a -> Stream a
singleton x = Stream step True where
step True = Yield x False
step False = Done
fromList :: [a] -> Stream a
fromList zs = Stream step zs
where
step (x:xs) = Yield x xs
step [] = Done
Strict left fold is done like this:
foldl'S :: (a -> b -> a) -> a -> Stream b -> a
foldl'S f a0 (Stream step s) = go a0 s where
go a s = a `seq`
case step s of
Yield b s' -> go (f a b) s'
Skip s' -> go a s'
Done -> a
and that gives the usual list-like functions lengthS = foldl'S (\n _ -> n+1) 0
, etc. This certainly isn't as elegant as Conduit
or Pipes
, but it's simple and fast. So far so good.
Now, let's try to aggregate 'low-level' streams to more high-level ones. For example, if you have a bitstream Stream Bool
you may want to decode the bits to yield a Stream Int
by using some clever codec. Of course it's always possible to build up a new step function s -> Step s b
from the step
function extracted of a given Stream step s
. Repeated applications of the step :: s->Step s a
function result in awkward case (step s) of ...
cascades that handle the three possibilities Done
, Skip
, Yield
, over and over again. Ideally, the aggregate
should like this:
aggregate :: Stream a -> M?? -> Stream b
newStream = aggregate oldStream $ do
a1 <- get -- a1 :: a
if a1 == True then doSomething
else do
a2 <- get
-- etc.
The M??
is some monad, to be defined. Let's try a type Appl s a
:
newtype Appl s a = Appl ((s->Step s a) -> s -> Step s a)
It's called Appl
because it has the signature of a function application. The monad instance is quite straightforward:
instance Monad (Appl s) where
return a = Appl (\_ s -> Yield a s)
(Appl ap) >>= f = Appl (\step s ->
case (ap step s) of
Done -> Done
Skip s' -> untilNotSkip step s'
Yield a' s' -> ap' step s' where Appl ap' = f a'
where untilNotSkip :: (s->Step s a) -> s -> Step s a
is just repeated (nested) application of a step :: (s->Step s a)
until a Done
or a Yield
is returned.
The get
function is just normal function application
get :: Appl s a
get = Appl(\step s -> step s)
To tie things up, Functor
and Applicative
need to be done, and here comes the problem: Appl s
can't be made a functor. The signature is
fmap :: (a->b) -> Appl s a -> Appl s b
and that just doesn't work because in order to make a function (s->Step s b) -> s -> Step s b)
from a function (s->Step s a) -> s -> Step s a)
I'd need a b->a
. I can pull back an Appl s b
over an a->b
, but I can't push forward an Appl s a
- i.e. I can have a contravariant functor but not a functor. That's weird. Streams are quite naturally comonads, but I don't see the connection. the purpose of Appl
is to turn a step function s->Step s a
into another one s->Step s b
.
Something is very wrong here, Appl
isn't the right "M??
". Can anyone help out?
Update
As pointed out by Li-yao Xia the type should be something like
data Walk a b = forall s. Walk ((s->Step s a) -> s -> Step s b)
And the Functor, Applicative and Monad instances would be
instance Functor (Step s) where
fmap f Done = Done
fmap f (Skip s) = Skip s
fmap f (Yield a s) = Yield (f a) s
instance Functor (Walk a) where
fmap f (Walk t) = Walk ( \step s -> fmap f (t step s) )
-- default Applicative given a monad instance
ap :: (Monad m) => m (a -> b) -> m a -> m b
ap mf m = do
f <- mf
x <- m
return (f x)
untilNotSkip :: (s->Step s a) -> s -> Step s a
untilNotSkip step s = case step s of
Done -> Done
Skip s' -> untilNotSkip step s'
Yield a' s' -> Yield a' s'
instance Monad (Walk a) where
return a = Walk (\_ s -> Yield a s)
Walk t >>= f =
Walk (\step s -> case t (untilNotSkip step) s of
Done -> Done
Skip _ -> error "Internal error."
Yield b' s' -> case f b' of Walk t' -> t' step s' -- bad
)
instance Applicative (Walk a) where
pure = return
(<*>) = ap
The type checker won't allow this monad instance, however. In the definition of >>=
the s
in Walk (\step s -> ...
is different from the s'
in Yield b' s' -> ...
, but it must be the same. The fundamental problem here is that (>>=) :: Walk a b -> (b->Walk a c) -> Walk a c
has two independently all-quantified states s
, one in the first argument, and another one that is returned by b->Walk a c
. Effectively this is (with abuse of notation)
(forall s. Walk s a b) -> (forall s'. b->Walk s' a' c) -> (forall s''. Walk s'' a c)
, which doesn't make sense, neither conceptually nor for the type checker. All three s
, s'
, s''
must be the same type.
A variation where Walk
is not all-quantified over s
:
data Walk s a b = Walk ((s->Step s a) -> s -> Step s b)
allows correct definition of bind, but then aggregate
won't work:
-- does not compile
aggregate :: Stream a -> Walk s a b -> Stream b
aggregate (Stream step s) (M t) = Stream (t step) s
Again, the stream state s
must always be the same. One way out of this would be to introduce a data PreStream s a = PreStream (s -> Step s a) s
, but that doesn't allow an aggregate :: Stream a -> ?? -> Stream b
either.
The source code is on github.
s
, though. See the update. – Kaine