RxJS reduce doesn't continue
Asked Answered
P

3

9

Why doesn't the flatMap cause downstream reductions to fire?

I got code like:

handleFiles.flatMap(files =>
  Rx.Observable.from(files).
  flatMap((file, i) => fileReader(file, i)).
  reduce((form, file, i) => {
    form.append('file[' + i + ']', result);
    console.log('reduce step', file);
    return form;
  }, new FormData()).
  tap(console.log.bind(console, 'after reduce'))
).
subscribe(console.log.bind(console, 'response'));

And the problem is that the 'after reduce' tap is never hit. Why?

The log is like:

reduce step [data]
reduce step [data]

Screenshot:

Error screenshot

Phototypy answered 8/11, 2015 at 16:4 Comment(0)
P
2

If files is an array, then reduce should terminate if the observable returned from fileReader does. So for this code, the problem was that fileReader returned an observable that didn't complete.

Phototypy answered 8/11, 2015 at 16:41 Comment(0)
S
20

The problem isn't in flatMap; it's in the way reduce works.

reduce reads in a whole stream and reduces it to a single value, emitted only when the source stream is closed. If your from(files) stream doesn't end, then reduce will never output its value.

Try using scan instead; it emits each intermediate step and seems to be what you're looking for.

Squad answered 8/11, 2015 at 16:17 Comment(2)
Files is a JS array, plain and simple. I added a screenshot. I had scan before and that did work... But since it's a JS array aggregate should do onComplete, right?Phototypy
However, it's a good catch. I found my mistake; not completing the subject in fileReader. Duh!Phototypy
P
2

If files is an array, then reduce should terminate if the observable returned from fileReader does. So for this code, the problem was that fileReader returned an observable that didn't complete.

Phototypy answered 8/11, 2015 at 16:41 Comment(0)
G
1

Here's an example using reduce with a non-terminating observable;

using windowTime:

import { fromEvent, interval, timer } from 'rxjs';
import { reduce, filter, windowTime, map, mergeMap } from 'rxjs/operators';

const interval$ = interval(1000);
const observable = interval$.pipe(
  windowTime(2000), // each window is 2s
  mergeMap(window$ => window$.pipe(
    reduce((a,x) => { // reduce to array
      return [...a,x];
    }, []),
    filter(x => !!x.length) // in the background timer is still running so suppress empty events
  )), // flatten the Observable-of-Observables
);
const subscription = observable.subscribe(x => console.log(x));
setTimeout(() => subscription.unsubscribe(), 10000);

using bufferTime:

import { fromEvent } from 'rxjs';
import { bufferTime, filter, map } from 'rxjs/operators';

let count = 1;
const clicks = fromEvent(document, 'click');
const observable = clicks.pipe(
  bufferTime(1000), // batch into array every 1s
  filter(x => !!x.length), // ignore events without clicks
  map(x => x.reduce((a,y) => ({...a, [count++]: y}), {})),
);
observable.subscribe(x => console.log(x));

using auditTime:

import { fromEvent } from 'rxjs';
import { tap, auditTime, map } from 'rxjs/operators';

let buffer = [];
const clicks = fromEvent(document, 'click');
const observable = clicks.pipe(
  tap((event) => buffer.push(event)),
  auditTime(1000), // buffer every 1s after 1st click is detected
  map((_lastEvent) => { // ignore last event
    const events = buffer; // save off buffer
    buffer = []; // clear buffer
    return events.reduce((a,e,i) => ({...a, [i]: e}),{});
  }),
);
observable.subscribe((events) => console.log(events));

using takeUntil and repeat:

NOTE: take/repeat will reset observable (ie. interval-counter stays at 0 and events may be lost)

import { fromEvent, timer, interval } from 'rxjs';
import { takeUntil, reduce, repeat, filter } from 'rxjs/operators';

const interval$ = interval(1000);
const timer$ = timer(2000);

const observable = interval$.pipe(
  takeUntil(timer$), // unsubscribe from stream every 2s so reduce terminates
  reduce((acc, event) => [...acc, event], []), // reduce to array of events
  filter(x => !!x.length), // suppress emission of empty stream
  repeat(), // resubscribe to stream
);
// console will only show array of [0] since takeUntil stops right when interval emits
const subscription = observable.subscribe(x => console.log(x));
setTimeout(() => subscription.unsubscribe(), 10000);
Georgeta answered 20/5, 2020 at 8:55 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.