How to collect array of emitted values from Observable.from?
Asked Answered
C

3

13

So in Rxjs, I have bunch of code,

return Observable.from(input_array)
           .concatMap((item)=>{
               //this part emits an Observable.of<string> for each item in the input_array
           })
           .scan((output_array:string[],each_item_output_array:string)=>{
               return output_array.push(each_item_output_array) ;
           });

But apparently this is wrong, the scan will break the code inside the concatMap, so I want to know how to collect the output array of each item in the observable from operator?

Clasp answered 18/2, 2017 at 6:0 Comment(3)
What do you mean emits a string? The project function passed to concatMap is supposed to return an observable, a promise or an array - not a string.Sabelle
@cartant, it's the Observable of string, it is actually a string, of course also the observable.Clasp
Actually, I take that back - you can return a string. concatMap will iterate it and emit its individual characters.Sabelle
S
51

In your call to scan you have not specified a seed for the accumulator. In that circumstance, the first value is used as a seed. For example:

Rx.Observable
  .from(["a", "b", "c"])
  .scan((acc, value) => acc + value)
  .subscribe(value => console.log(value));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

In your snippet, the first value is not an array, so you cannot call push on it. To accumulate the values into an array, you can specify an array seed like this:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .scan((acc, value) => {
    acc.push(value);
    return acc;
  }, []) // Note that an empty array is use as the seed
  .subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Although, for some use cases, it would be preferable to not mutate the array:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .scan((acc, value) => [...acc, value], [])
  .subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

Note that scan emits an array for each value that it receives. If you only want a single array emitted when the observable completes, you can use the toArray operator instead:

Rx.Observable
  .from(["a", "b", "c"])
  .concatMap(value => Rx.Observable.of(value))
  .toArray()
  .subscribe(value => console.log(JSON.stringify(value)));
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
Sabelle answered 18/2, 2017 at 6:47 Comment(1)
If I could upvote this a hundred times I would. toArray solved a problem I'd been working on all day. Thank you!Booma
F
5

Be carefull with this code:

      const obs = Rx.Observable
      .from(["a", "b", "c"])
      .concatMap(value => Rx.Observable.of(value))
      .scan((acc, value) => {
        acc.push(value);
        return acc;
      }, []); 
      obs.subscribe(value => console.log(JSON.stringify(value)));
      obs.subscribe(value => console.log(JSON.stringify(value)));

Result will be a bit unexpected:

 ["a"]
 ["a","b"]
 ["a","b","c"]
 ["a","b","c","a"]
 ["a","b","c","a","b"]
 ["a","b","c","a","b","c"]

"acc" variable is reference object and each subscriber gets stream data and adds data to the same object again. It might be a lot of solutions for avoiding it, this is creation new object when stream data is received again :

    var obs = Rx.Observable
          .from(["a", "b", "c"])
          .concatMap(value => Rx.Observable.of(value))
          .scan((acc, value) => {
          //clone initial value
            if (acc.length == 0) {

                   acc = [];
                }
            acc.push(value);
            return acc 
          }, []); // Note that an empty array is use as the seed
          obs.subscribe(value => console.log(JSON.stringify(value)));
          obs.subscribe(value => console.log(JSON.stringify(value)));

result as expected:

 ["a"]
 ["a","b"]
 ["a","b","c"]
 ["a"]
 ["a","b"]
 ["a","b","c"]

I hope it saves a lot time for someone

Fiddler answered 11/5, 2019 at 6:36 Comment(2)
Instead of creating a new empty array when the first element comes, you can return a new array on every iteration: return [...acc, value]. This is a more functional approach and browsers are quite good in optimizing this.Colossus
@sashee it is good point, there is lot of solution and the main idea is breaking reference to object with array for each stream dataFiddler
H
2

Another option is bufferCount(count) if you know the length of the input array you can get a single output containing that number of items. Cleaner syntax than having to remember how to use scan.

Note: If you don't know the size (although in your example you would) then count represents a maximum - but this may have resource constraints so don't just make it 99999.

 const array = source$.pipe(bufferCount(10));

In my case I had a list of 'operations' being executed and I knew the total number of steps in advance, so bufferCount worked quite well. However be sure to carefully consider error conditions

 // one approach to handling errors that still returns an output array
 // only use this if a single failure shouldn't stop the collection
 const array = source$.pipe(
                    map((result) => ({ hasError: false, result: result }),
                    catchError((err) => ({ hasError: true, error: err }),
                    bufferCount(10));

The decision on how to handle errors will vary greatly based on what your observables actually are - but the point here is to show bufferCount() as an option.

(There are other buffer operations available too)

Hetaerism answered 12/1, 2019 at 23:41 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.