I have revisited the code years later, here is the updated code
It should be good for large files piped into stdin,
Supporting utf8 and pause && resume
It can read a stream line by line and process the input without choking.
StreamLinesReader Class Description:
The StreamLinesReader class is designed to process streams of text data, specifically handling each line of input efficiently and correctly. Key features of this class include:
The class sets the stream's encoding to UTF-8. To ensure that multibyte utf-8 characters are not split in the middle.
Stream uses Pause-Resume Mechanism: The class pauses the input stream upon receiving data. This mechanism is essential for controlling the flow of data, especially in scenarios where the line processing function is slower than the rate at which data is received. By pausing the stream, it prevents buffer overflow and ensures that each line is processed sequentially without losing any data.
Accumulation of Incomplete Lines: In cases where the data chunks received do not always end with a newline character, the class accumulates these partial lines. It holds them in a buffer until a complete line (ending with a newline) is received. This approach ensures that lines are processed only when they are complete, preserving the integrity of the data.
Handling of Split Lines: When a chunk of data is received, the class splits it into lines and processes each line individually. If the last part of the chunk does not end with a newline, this part is buffered and not returned right away. After processing all complete lines, the class pushes the last (potentially incomplete) line into the buffer. After all lines were processed the stream is then resumed to receive more data, allowing the
buffered line to be completed in the next chunk.
Stream End Processing: When the end of the stream is reached, the class checks if there is any remaining data in the buffer (an incomplete line) and then it processes it. Additionally, the class provides a mechanism to notify when the stream processing is complete, through the wait function which returns a promise that resolves upon the completion of the stream processing.
class code:
class StreamLinesReader {
constructor(stream, onLineFunction, onEnd = undefined) {
stream.pause();
this.stream = stream;
this.onLine = onLineFunction;
this.onEnd = onEnd;
this.buffer = [];
this.line = 0;
this.stream.setEncoding('utf8');
this.stream.on('data', (chunk) => {
stream.pause();
this.processChunk(chunk).then(() => this.stream.resume());
});
this.stream.on('end', async () => {
if (this.buffer.length) {
const str = this.buffer.join('');
await this.onLine(str, this.line++);
}
if (this.onEnd) await this.onEnd();
if (this.resolveWait) this.resolveWait();
});
this.stream.resume();
}
async processChunk(chunk) {
const newlines = /\r\n|\n/;
const lines = chunk.split(newlines);
if (lines.length === 1) {
this.buffer.push(lines[0]);
return;
}
// Join buffer and first line
this.buffer.push(lines[0]);
const str = this.buffer.join('');
this.buffer.length = 0;
await this.onLine(str, this.line++);
// Process lines in the chunk
for (let i = 1; i < lines.length - 1; i++) {
await this.onLine(lines[i], this.line++);
}
// Buffer the last line (might be the beginning of the next line)
this.buffer.push(lines[lines.length - 1]);
}
// optional:
waitEnd() {
// Return a new promise and save the resolve function
return new Promise((resolve) => {
this.resolveWait = resolve;
});
}
}
example usage:
session.on('pty', (accept, reject, info) => {
accept();
session.on('shell', (accept, reject) => {
const stream = accept();
const onLineFunction = async (line, lineNumber) => {
console.log(lineNumber, "line ", line);
if (line === 'exit') {
stream.end();
// Assuming conn is a connection variable defined elsewhere
conn.end();
}
};
const onEndFunction = async () => {
console.log("Stream has ended");
};
new StreamLinesReader(stream, onLineFunction, onEndFunction);
const OUTPUT = 'shell output!\n';
stream.write(OUTPUT);
});
});
my old synchronous code was:
read stream line by line,should be good for large files piped into stdin, my version:
var n=0;
function on_line(line,cb)
{
////one each line
console.log(n++,"line ",line);
return cb();
////end of one each line
}
var fs = require('fs');
var readStream = fs.createReadStream('all_titles.txt');
//var readStream = process.stdin;
readStream.pause();
readStream.setEncoding('utf8');
var buffer=[];
readStream.on('data', (chunk) => {
const newlines=/[\r\n]+/;
var lines=chunk.split(newlines)
if(lines.length==1)
{
buffer.push(lines[0]);
return;
}
buffer.push(lines[0]);
var str=buffer.join('');
buffer.length=0;
readStream.pause();
on_line(str,()=>{
var i=1,l=lines.length-1;
i--;
function while_next()
{
i++;
if(i<l)
{
return on_line(lines[i],while_next);
}
else
{
buffer.push(lines.pop());
lines.length=0;
return readStream.resume();
}
}
while_next();
});
}).on('end', ()=>{
if(buffer.length)
var str=buffer.join('');
buffer.length=0;
on_line(str,()=>{
////after end
console.error('done')
////end after end
});
});
readStream.resume();