What is the "reactive" way to read file line-by-line
Asked Answered
M

5

7

I'm learning reactive programming using RxJS and encounter a case when I need to read a file line-by-line. Actually I solved it using a solution likes:

https://gist.github.com/yvele/447555b1c5060952a279

It works, but I need to use some normal JS code to transform the stream of Buffers to stream of lines. (use "readline" module in example above)

I wonder if there are other ways to transform an Observable of Buffer to Observable of line, using RxJS operators, likes example below.

var Rx = require('rx');
var fs = require('fs');
var lines = Rx.Observable
  .fromEvent(rl, 'data') // emits buffers overtime
  // some transforms ...
  .subscribe(
    (line) => console.log(line), // emit string line by line
    err => console.log("Error: %s", err),
    () => console.log("Completed")
  );
Mutilate answered 17/8, 2016 at 8:12 Comment(1)
what do you consider normal js code?Rhoden
D
2

You can probably achieve something pretty close to what you want with scan and concatMap.

Something like:

bufferSource
  .concat(Rx.Observable.of("\n")) // parens was missing // to make sure we don't miss the last line!
  .scan(({ buffer }, b) => {
    const splitted = buffer.concat(b).split("\n");
    const rest = splitted.pop();
    return { buffer: rest, items: splitted };
  }, { buffer: "", items: [] })
  // Each item here is a pair { buffer: string, items: string[] }
  // such that buffer contains the remaining input text that has no newline
  // and items contains the lines that have been produced by the last buffer
  .concatMap(({ items }) => items)
  // we flatten this into a sequence of items (strings)
  .subscribe(
    item => console.log(item),
    err => console.log(err),
    () => console.log("Done with this buffer source"),
  );
Domoniquedomph answered 17/8, 2016 at 16:3 Comment(3)
Extractly what I want. Seem like I miss the scan operator. Thank you.Mutilate
I just noticed something, does your code not emit the last line?Mutilate
Oh, if the last line is not terminated by "\n", indeed it will get lost! Let me try to fix it. EDIT: Fixed it on line 2 I think.Domoniquedomph
I
2

I tried a bunch of the above answers and built my own ugly version. Then, I poked around the code on GitHub and found that RxJS handles stream like objects - there's no point in mucking around with events. Just pass a ReadStream to from and it tests it for ReadableStreamLike and then turns it into an AsyncGenerator.

import * as readline from 'node:readline';
import { from } from 'rxjs';

const file = fs.createReadStream(fileName);
const line = readline.createInterface({ input: file });

const line$ = from(line).subscribe({
  next:  (dat) => { ... },
  error: (err) => { ... },
  complete: () => { ... }
});
Inherited answered 27/12, 2022 at 5:47 Comment(0)
T
1

You can use following class

'use strict'

const lineReader = require('line-reader');
const Rx = require('rxjs');
const RxOp = require('rxjs/operators');

class CSVReader {
    constructor(filepath {
        this.filepath = filepath;
    }

    readByLines() 
    {
        const source = new Rx.Subject();

        lineReader.open(this.filepath, (err, reader)=> {
            Rx.of(0).pipe(
                RxOp.expand(val => {
                    reader.nextLine((err2, line) => source.next(line));
                    return Rx.of(1 + val);
                }),
                RxOp.takeWhile(_=> { 
                    let has = reader.hasNextLine();
                    if(!has) source.complete();
                    return has;
                })
            ).subscribe(_=>_);
        })

        return source;        
    }
}

module.exports = CSVReader

and use it as follows

const { bufferCount } = require('rxjs/operators');

let reader = new CSVReader('path/to/file');

reader.readByLines()
    .pipe(bufferCount(2)) // chunk size
    .subscribe(chunk=> {
        console.log({chunk});
    });
Trapani answered 24/10, 2020 at 0:42 Comment(0)
D
1

I would say like this:

const readline = require('readline');
const fs = require('fs');
const path = require('path');
const {fromEvent, race, Observable} = require('rxjs');
const {tap, takeUntil, take, map} = require('rxjs/operators');



const rl = readline.createInterface({
    input: fs.createReadStream(path.resolve('./', 'myfile'))
});


let obs = new Observable(observer=>{
    rl.on('line', val => observer.next(val)),
    rl.on('error', err => observer.error(err)),
    rl.on('close', complete => observer.complete(complete))
})
.pipe(tap(line=>console.log(`line: ${line}`)))

obs.subscribe(()=>{},
   (e)=>console.log(`Error reading file: ${e}`),
   ()=>console.log("Read complete"))

An alternative for creating the observable could be:

let obs = fromEvent(rl, 'line')
.pipe(
    takeUntil(race(
        fromEvent(rl, 'close').pipe(take(1))  , 
        fromEvent(rl, 'error').pipe(map((err)=>{throw err}))   
    )))

Ideally, rxjs could have provided an operator like: fromEvent(emitter, nextEvent, errorEvent, completeEvent ) to help keep the above code even simpler.

Doorstep answered 25/1, 2021 at 23:42 Comment(0)
M
0

I was struggling with this for a while and used TypeScript to solve some of the weird issues I was getting, so I hope you can work with this solution. Let me know if you'd prefer a vanilla JavaScript implementation :)

So, assuming you are trying to read from a csv file, the following method is the cleanest implementation that I have found to return and observable with the data parsed into objects after reading each line.

You can use this method to work with your data before you call lines.push(row); or you can call readTabFile() and work with the stream from there.

In this example I used a tab separated file, but you can also use this approach for csv files. It uses csv-parse to map data onto the right interface.

import * as fs from 'fs';
import { parse } from 'csv-parse';
import type { Parser } from 'csv-parse';
import { Observable } from 'rxjs';

interface Columns {
  columnA: string;
  columnB: string;
}

function readTabFile(): Observable<Columns[]> {
  const parser: Parser = parse({
    delimiter: '\t',
    columns: ['columnA', 'columnB'],
  });
  return new Observable((observer) => {
    const lines: Columns[] = [];
    const stream = fs.createReadStream('./file.TAB', {
      encoding: 'utf8',
    });

    parser.on('data', (row: Columns) => {
      lines.push(row);
    });

    parser.on('end', () => {
      observer.next(lines);
      observer.complete();
    });

    stream.pipe(parser);
  });
}
Mazman answered 12/3, 2023 at 13:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.