JavaScript: Folding infinite streams (generator function)
Asked Answered
H

3

3

In Java it is possible to declare and fold infinite streams as so

List<Integer> collect = Stream.iterate(0, i -> i + 2)
    .map(i -> i * 3)
    .filter(i -> i % 2 == 0)
    .limit(10)
    .collect(Collectors.toList());

// -> [0, 6, 12, 18, 24]

In JavaScript I could use generator functions to yield and spread stream of values.

// Limit the value in generator
let generator = (function* () {
    for (let i=0; i<10; i++) {
        yield i
    }
})()

[ ...generator ]
    .map(i => i * 3)
    .filter(i => i % 2 === 0)

// -> [0, 6, 12, 18, 24]

But how could I stream and fold an infinite stream? I know I could iterate and limit the stream with for (n of generator) loop. But is it possible with fluent API such as Java example?

Hewes answered 17/1, 2019 at 17:56 Comment(0)
S
2

Here's an example -

// a terminating generator
const range = function* (from, to)
{ while (from < to)
    yield from++
}

// higher-order generator
const G =
  range(0, 100).filter(isEven).map(square)

for (const x of G)
  console.log(x)

// (0*0) (2*2) (4*4) (6*6) (8*8) ...
// 0 4 16 36 64 ...

We can make something like this possible by extending the generator prototype -

const Generator =
  Object.getPrototypeOf(function* () {})

Generator.prototype.map = function* (f, context)
{ for (const x of this)
    yield f.call(context, x)
}

Generator.prototype.filter = function* (f, context)
{ for (const x of this)
    if (f.call(context, x))
      yield x
}

Expand the snippet below to verify our progress in your browser -

const Generator =
  Object.getPrototypeOf(function* () {})

Generator.prototype.map = function* (f, context)
{ for (const x of this)
    yield f.call(context, x)
}

Generator.prototype.filter = function* (f, context)
{ for (const x of this)
    if (f.call(context, x))
      yield x
}

// example functions
const square = x =>
  x * x

const isEven = x =>
  (x & 1) === 0
  
// an terminating generator
const range = function* (from, to)
{ while (from < to)
    yield from++
}

// higher-order generator
for (const x of range(0, 100).filter(isEven).map(square))
  console.log(x)

// (0*0) (2*2) (4*4) (6*6) (8*8) ...
// 0 4 16 36 64 ...

Moving on, something like fold or collect assumes that the stream eventually terminates, otherwise it cannot return a value -

Generator.prototype.fold = function (f, acc, context)
{ for (const x of this)
    acc = f.call(context, acc, x)
  return acc
}

const result =
  range(0, 100)      // <- a terminating stream
    .filter(isEven)
    .map(square)
    .fold(add, 0)    // <- assumes the generator terminates

console.log(result)
// 161700

If you have to fold an infinite stream, you can implement limit -

Generator.prototype.limit = function* (n)
{ for (const x of this)
    if (n-- === 0)
      break // <-- stop the stream
    else
      yield x
}

// an infinite generator
const range = function* (x = 0)
{ while (true)
    yield x++
}

// fold an infinite stream using limit
const result =
  range(0)          // infinite stream, starting at 0
    .limit(100)     // limited to 100 values
    .filter(isEven) // only pass even values
    .map(square)    // square each value
    .fold(add, 0)   // fold values together using add, starting at 0

console.log(result)
// 161700

Expand the snippet below to verify the result in your browser -

const Generator =
  Object.getPrototypeOf(function* () {})

Generator.prototype.map = function* (f, context)
{ for (const x of this)
    yield f.call(context, x)
}

Generator.prototype.filter = function* (f, context)
{ for (const x of this)
    if (f.call(context, x))
      yield x
}

Generator.prototype.fold = function (f, acc, context)
{ for (const x of this)
    acc = f.call(context, acc, x)
  return acc
}

Generator.prototype.limit = function* (n)
{ for (const x of this)
    if (n-- === 0)
      break // <-- stop the stream
    else
      yield x
}

const square = x =>
  x * x

const isEven = x =>
  (x & 1) === 0
  
const add = (x, y) =>
  x + y

// an infinite generator
const range = function* (x = 0)
{ while (true)
    yield x++
}

// fold an infinite stream using limit
const result =
  range(0)          // starting at 0
    .limit(100)     // limited to 100 values
    .filter(isEven) // only pass even values
    .map(square)    // square each value
    .fold(add, 0)   // fold values together using add, starting at 0

console.log(result)
// 161700

Above, notice how changing the order of the limit to after the filter expression changes the result -

const result =
  range(0)          // starting at 0
    .filter(isEven) // only pass even values
    .limit(100)     // limited to 100 values
    .map(square)    // square each value
    .fold(add, 0)   // fold values together using add, starting at 0

console.log(result)
// 1313400

In the first program -

  1. start with an infinite range (0, 1, 2, 3, 4, ...)
  2. limit to 100 values (0, 1, 2, 3, 4, ...,97, 98, 99)
  3. only pass even values (0, 2, 4, ...94, 96, 98)
  4. square each value (0, 4, 16, ..., 8836, 9216, 9604)
  5. fold values using add, starting at 0, (0 + 0 + 4 + 16 + ..., + 8836 + 9216 + 9604)
  6. result 161700

In the second program -

  1. start with an infinite range (0, 1, 2, 3, 4, ...)
  2. only pass even values (0, 2, 4, ...)
  3. limit to 100 values (0, 2, 4, 6, 8, ...194, 196, 198)
  4. square each value (0, 4, 16, 36, 64, ..., 37636, 38416, 29304)
  5. fold values using add, starting at 0, (0 + 4 + 16 + 36 + 64 + ..., + 37636+ 38416 + 29304)
  6. result 1313400

Finally we implement collect, which unlike fold, does not ask for an initial accumulator. Instead, the first value is manually pumped from the stream and used as the initial accumulator. The stream is resumed, folding each value with the previous one -

Generator.prototype.collect = function (f, context)
{ let { value } = this.next()
  for (const x of this)
    value = f.call(context, value, x)
  return value
}

const toList = (a, b) =>
  [].concat(a, b)

range(0,100).map(square).collect(toList)
// [ 0, 1, 2, 3, ..., 97, 98, 99 ]

range(0,100).map(square).collect(add)
// 4950

And watch out for double-consuming your streams! JavaScript does not give us persistent iterators, so once a stream is consumed, you cannot reliably call other higher-order functions on the stream -

// create a stream
const stream  =
  range(0)
    .limit(100)
    .filter(isEven)
    .map(square)

console.log(stream.fold(add, 0)) // 161700
console.log(stream.fold(add, 0)) // 0 (stream already exhausted!)

// create another stream
const stream2  =
  range(0)
    .limit(100)
    .filter(isEven)
    .map(square)

console.log(stream2.fold(add, 0)) // 161700
console.log(stream2.fold(add, 0)) // 0 (stream2 exhausted!)

This is likely to happen when you're doing something like merge -

const r =
  range (0)

r.merge(r, r).limit(3).fold(append, [])
// double consume! bug!
// [ [ 0, 1, 2 ], [ 3, 4, 5 ], [ 6, 7, 8 ] ]
// expected:
// [ [ 0, 0, 0 ], [ 1, 1, 1 ], [ 2, 2, 2 ] ]

// fresh range(0) each time
range(0).merge(range(0), range(0)).limit(3).fold(append, [])
// correct:
// [ [ 0, 0, 0 ], [ 1, 1, 1 ], [ 2, 2, 2 ] ]

Using a fresh generator (range(0)...) each time avoids the problem -

const stream =
  range(0)
    .merge
      ( range(0).filter(isEven)
      , range(0).filter(x => !isEven(x))
      , range(0).map(square)
      )
    .limit(10)

console.log ('natural + even + odd + squares = ?')
for (const [ a, b, c, d ] of stream)
  console.log (`${ a } + ${ b } + ${ c } + ${ d } = ${ a + b + c + d }`)

// natural + even + odd + squares = ?
// 0 + 0 + 1 + 0 = 1
// 1 + 2 + 3 + 1 = 7
// 2 + 4 + 5 + 4 = 15
// 3 + 6 + 7 + 9 = 25
// 4 + 8 + 9 + 16 = 37
// 5 + 10 + 11 + 25 = 51
// 6 + 12 + 13 + 36 = 67
// 7 + 14 + 15 + 49 = 85
// 8 + 16 + 17 + 64 = 105
// 9 + 18 + 19 + 81 = 127

This is the key reason to use parameters for our generators: it will get you to think about reusing them properly. So instead of defining stream as a const above, our streams should always be functions, even if nullary ones -

// streams should be a function, even if they don't accept arguments
// guarantees a fresh iterator each time
const megaStream = (start = 0, limit = 1000) =>
  range(start) // natural numbers
    .merge
      ( range(start).filter(isEven) // evens
      , range(start).filter(x => !isEven(x)) // odds
      , range(start).map(square) // squares
      )
    .limit(limit)

const print = s =>
{ for (const x of s)
    console.log(x)
}

print(megaStream(0).merge(megaStream(10, 3)))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ] ]

print(megaStream(0).merge(megaStream(10), megaStream(100)).limit(5))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ], [ 100, 100, 101, 10000 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ], [ 101, 102, 103, 10201 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ], [ 102, 104, 105, 10404 ] ]
// [ [ 3, 6, 7, 9 ], [ 13, 16, 17, 169 ], [ 103, 106, 107, 10609 ] ]
// [ [ 4, 8, 9, 16 ], [ 14, 18, 19, 196 ], [ 104, 108, 109, 10816 ] ]

We can implement merge as -

Generator.prototype.merge = function* (...streams)
{ let river = [ this ].concat(streams).map(s => [ s, s.next() ])
  while (river.every(([ _, { done } ]) => done === false))
  { yield river.map(([ _, { value } ]) => value)
    river = river.map(([ s, _ ]) => [ s, s.next() ])
  }
}

Expand the snippet below to verify the result in your browser -

const Generator =
  Object.getPrototypeOf(function* () {})

Generator.prototype.map = function* (f, context)
{ for (const x of this)
    yield f.call(context, x)
}

Generator.prototype.filter = function* (f, context)
{ for (const x of this)
    if (f.call(context, x))
      yield x
}

Generator.prototype.limit = function* (n)
{ for (const x of this)
    if (n-- === 0)
      break // <-- stop the stream
    else
      yield x
}

Generator.prototype.merge = function* (...streams)
{ let river = [ this ].concat(streams).map(s => [ s, s.next() ])
  while (river.every(([ _, { done } ]) => done === false))
  { yield river.map(([ _, { value } ]) => value)
    river = river.map(([ s, _ ]) => [ s, s.next() ])
  }
}

const isEven = x =>
  (x & 1) === 0

const square = x =>
  x * x

const range = function* (x = 0)
{ while (true)
    yield x++
}

// streams should be functions, even if they don't have parameters
const megaStream = (start = 0, limit = 1000) =>
  range(start) // natural numbers
    .merge
      ( range(start).filter(isEven) // evens
      , range(start).filter(x => !isEven(x)) // odds
      , range(start).map(square) // squares
      )
    .limit(limit)

// for demo only
const print = s =>
{ for (const x of s) console.log(x) }

print(megaStream(0).merge(megaStream(10, 3)))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ] ]

print(megaStream(0).merge(megaStream(10), megaStream(100)).limit(5))
// [ [ 0, 0, 1, 0 ], [ 10, 10, 11, 100 ], [ 100, 100, 101, 10000 ] ]
// [ [ 1, 2, 3, 1 ], [ 11, 12, 13, 121 ], [ 101, 102, 103, 10201 ] ]
// [ [ 2, 4, 5, 4 ], [ 12, 14, 15, 144 ], [ 102, 104, 105, 10404 ] ]
// [ [ 3, 6, 7, 9 ], [ 13, 16, 17, 169 ], [ 103, 106, 107, 10609 ] ]
// [ [ 4, 8, 9, 16 ], [ 14, 18, 19, 196 ], [ 104, 108, 109, 10816 ] ]
Splendent answered 17/1, 2019 at 19:3 Comment(0)
C
2

Here is an alternative approach to the given answer.

1. Functional API

First create a functional API.

const itFilter = p => function* (ix) {
  for (const x of ix)
    if (p(x))
      yield x;
};

const itMap = f => function* (ix) {
  for (const x of ix)
    yield f(x);
};

const itTake = n => function* (ix) {
  let m = n;
  
  for (const x of ix) {
    if (m-- === 0)
      break;

    yield x;
  }
};

const comp3 = f => g => h => x =>
  f(g(h(x)));    const xs = [1,2,3,4,5,6,7,8,9,10];

const stream = comp3(itTake(3))
  (itFilter(x => x % 2 === 0))
    (itMap(x => x * 3));

console.log(
  Array.from(stream(xs))
);

2. Box-Type

Next, define a Box type to allow method chaining for arbitrarily functional APIs.

function Box(x) {
  return new.target ? (this.x = x, this) : new Box(x)
}

Box.prototype.map = function map(f) {return new Box(f(this.x))};
Box.prototype.fold = function fold(f) {return f(this.x)};

3. Method Chaining

Finally, use the new Box type to chain methods.

const itFilter = p => function* (ix) {
  for (const x of ix)
    if (p(x))
      yield x;
};

const itMap = f => function* (ix) {
  for (const x of ix)
    yield f(x);
};

const itTake = n => function* (ix) {
  let m = n;
  
  for (const x of ix) {
    if (m-- === 0)
      break;
      
    yield x;
  }
};

const xs = [1,2,3,4,5,6,7,8,9,10];

function Box(x) {
  return new.target ? (this.x = x, this) : new Box(x)
}

Box.prototype.map = function map(f) {return new Box(f(this.x))};
Box.prototype.fold = function fold(f) {return f(this.x)};

const stream = Box(xs)
  .map(itMap(x => x * 3))
  .map(itFilter(x => x % 2 === 0))
  .map(itTake(3))
  .fold(x => x);
  
 console.log(
   Array.from(stream)
 );

Box gives you a fluent API for free.

Consolation answered 17/1, 2019 at 19:55 Comment(1)
@user633183 Hihi, I didn't see these eta reductions because of the mixin of arrows and normal functions...Consolation
S
0

I'll add another answer which might be what you're looking for. I'm the author of scramjet a framework based on streams which adds a fluent API to transforms. What you wanted can be achieved pretty easily with it:

import {DataStream} from "scramjet";
let i = 0;
const out = await (
    DataStream.from(function*() { let n = 2; while (true) yield n++; })
        .map(n => n+2)
        .filter(i -> i % 2 == 0)
        .until(() => i++ === 10)
        .toArray()
);

I built it mostly for asynchronous operations (so you can just replace any of those functions with async ones and it will work exactly the same). So the answer if this is possible is yes.

One note though: node.js streams which this is based on have some buffers in them, so the generator will probably be iterated couple more times than the until method allows.

Swap answered 19/1, 2019 at 18:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.