How to take an element of a stream (Elixir) and keep the state of the stream?
Asked Answered
C

3

8

I love Elixir streams. In particular I've got a function which constructs an infinite stream of prime numbers; you can take the first 10,000, or all the primes below 1,000,000, or whatever, with the appropriate Stream/Enum operations (Enum.take(10_000) or Enum.take_while(& (&1 < 1_000_000)) respectively).

But suppose I don't know in advance how many primes I need. I get to a certain point and I say, hey, actually needed another thousand primes. Is there a way to say, get the first 10,000 elements of a stream, then save the resulting stream object somehow, so that I can get the next 1,000 on demand (and repeatedly of course)?

Cliff answered 23/2, 2017 at 14:30 Comment(1)
This looks like what you want: github.com/tallakt/stream_split/blob/master/lib/stream_split.ex. I tried implementing something similar myself first with Enumerable.reduce but then decided to search for existing solutions since it wasn't very straightforward.Cadmus
B
5

TL;DR Save the accumulator, not “Stream.”

The robust solution is provided by @Dogbert in comments: StreamSplit package seems to permorm exactly what was asked.

For the sake of history, my answer is: there are many Stream functions (all derived from Stream.transform/4, which is a generic stream implementation for nearly everything one might need) that might do the trick. For instance, consider Fibonacci numbers. One might implement them as:

stream = Stream.iterate({1, 1}, fn {acc, i} ->
  {acc + i, acc}
end)
#⇒ #Function<61.36862645/2 in Stream.unfold/2>
stream |> Enum.take(5) 
#⇒ [{1, 1}, {2, 1}, {3, 2}, {5, 3}, {8, 5}]
current = stream |> Enum.take(5) |> List.last
#⇒ {8, 5}

If you want to continue getting numbers:

#              ⇓⇓⇓⇓⇓⇓
Stream.iterate({8, 5}, fn {acc, i} ->
  {acc + i, acc}
end)

Just keep in an intermediate state and pass it as an initial value to stream function you are using to get primes. I personally do not see any advantage in keeping “tail” instead of the accumulator, but I might be definitely wrong.

Brushwood answered 23/2, 2017 at 14:58 Comment(7)
This certainly works, but it's odd to have to either (a) repeat the generation code (easy in your example but not in others) or (b) leave the stream ecosystem. I'm surprised there isn't a function in Stream which achieves this ...Cliff
I believe you can save the stream state, but it looks a little complicated: github.com/tallakt/stream_split/blob/master/lib/stream_split.ex.Cadmus
@Cadmus this is a contrived example that possibly would fail when used with Stream.zip. Any Stream reducer accepts {:suspend, acc} tuple on iteration step, so implementing this should not be as hard. Unfortunately, Stream API is not quite mature yet, half of it uses lazy %Stream{} reducers, the other half is using plain function/2 reducers. Also, it partially mixes internal and external states. I would go with saving the accumulator, that is both more robust and natural.Brushwood
@RichardRast if you don’t want to leave Stream ecosystem, whatever it means, just use Stream.transform/4 and return {:suspend, acc} tuple to suspend an execution and {:cont, acc} to continue. There is no code duplication: I put the code twice to make it clearer, it is basically a call to MyProducer.continue(acc, reducer). I do not see any advantage in this approach, though, because you still need to store an accumulator, but now you need to store the reducer as well.Brushwood
@mudasobwa it seems to be working fine with Stream.zip for me. Saving an accumulator may not always be easy, for example, what what would you save with say File.stream!? (Just an example, I couldn't find a way to do this after a quick glance but maybe there is.)Cadmus
@mudasobwa What I meant by leaving the Stream ecosystem is that, once you're storing the accumulator and producer/reducer, you're not saving any trouble by using Streams at all -- you might as well just write a "getNextAndAcc(acc)" function and call it as needed. It would be less code and work than actually using the Stream!Cliff
@RichardRast I believe Streams are intended to be run in parallel, but as soon as you have a meeting point (say, kinda mutex) in the middle of execution, I personally do not see any advantage of keeping something but the accumulator as the “tail.” I might be wrong; for the implementation please refer to Dogbert’s link. I will leave this answer intact for the sake of history.Brushwood
B
1

You have a basic misunderstanding of Streams. A Stream is about creating composition of functions so that you can do complex processing on an enumerable with only one pass through the original enumerable.

It's easy to confuse a Stream with a Service, and with enough digging you can "pause" a Stream to create something service like. However, really what you want is a Prime Server. The minute you start thinking about "state" you should think about a GenServer.

Brickyard answered 24/2, 2017 at 16:46 Comment(0)
E
-1

Seems like 4 years ago, Stream.take/2 and Stream.drop/2 were added to the Stream library, offering a built in answer to your question:

The code beneath also computes lazily the stream:

head = Stream.take(my_stream, 1000) |> Enum.to_list()
tail = Stream.drop(my_stream, 1000)

head being the 1000 first elements of your stream and tail a stream of the next elements.

Extortioner answered 29/5, 2023 at 23:36 Comment(1)
You are not considering that the computation will occur multiple times when the stream is re-evaluated. This becomes evident with side effects, such as IO.inspect/2.Aq

© 2022 - 2024 — McMap. All rights reserved.