How to defer stream read invocation
Asked Answered
C

1

10

I'm still trying to grok my way through streams in general. I have been able to stream a large file using multiparty from within form.on('part'). But I need to defer the invocation and resolve the stream before it's read. I have tried PassThrough, through. through2, but have gotten different results, which it mainly hangs, and I can't figure out what to do, nor steps to debug. I'm open to all alternatives. Thanks for all insights.

import multiparty from 'multiparty'
import {
  PassThrough
} from 'stream';
import through from 'through'
import through2 from 'through2'

export function promisedMultiparty(req) {
  return new Promise((resolve, reject) => {

    const form = new multiparty.Form()
    const form_files = []
    let q_str = ''

    form.on('field', (fieldname, value) => {
      if (value) q_str = appendQStr(fieldname, value, q_str)
    })

    form.on('part', async (part) => {
      if (part.filename) {

        const pass1 = new PassThrough() // this hangs at 10% 

        const pass2 = through(function write(data) { // this hangs from the beginning
            this.queue(data)
          },
          function end() {
            this.queue(null)
          })

        const pass3 = through2() // this hangs at 10%

        /* 
            // This way works for large files, but I want to defer 
            // invocation

            const form_data = new FormData()
            form_data.append(savepath, part, {
              filename,
            })

            const r = request.post(url, {
              headers: {
                'transfer-encoding': 'chunked'
              }
            }, responseCallback(resolve))
            r._form = form

        */

        form_files.push({
          part: part.pipe(pass1),
          // part: part.pipe(pass2),
          // part: part.pipe(pass3),
        })

      } else {
        part.resume()
      }
    })

    form.on('close', () => {
      resolve({
        fields: qs.parse(q_str),
        forms: form_files,
      })
    })

    form.parse(req)
  })
}

p.s. For sure the title could be better, if someone could use the proper terms please. Thanks.

Cyclamen answered 13/9, 2018 at 19:2 Comment(6)
I should point out that PassThrough and through2 works with smaller files. Does PassThrough and through2 behave the same way (hangs at 10%) because they are based on Stream2?Cyclamen
Could you explain a litte bit more what you want to do?Millard
Thanks for having a gander @F.bernal. Instead of sending the request from within form.on('part'), I want to resolve with the unread stream, and in the function where the promise resolves, start streaming with additional context.Cyclamen
Any tips on how to debug or expose the stream post multiparty would be great otherwise.Cyclamen
Correct me if I'm wrong. You want that multipart do its job by giving you the file's chunks and the point where your function promisedMultiparty is called start doing the file streaming?Millard
Yeah @F.bernal. Thanks for the replyCyclamen
F
1

I believe this is because you are not using through2 correctly - i.e. not actually emptying the buffer once it's full (that's why it hangs at 10% on bigger files, but works on smaller ones).

I believe an implementation like this should do it:

const pass2 = through2(function(chunk, encoding, next) {

   // do something with the data


   // Use this only if you want to send the data further to another stream reader 
   // Note - From your implementation you don't seem to need it
   // this.push(data)

   // This is what tells through2 it's ready to empty the 
   //  buffer and read more data
   next();
})
Frigate answered 8/10, 2018 at 21:25 Comment(1)
That was the missing piece of logic. Thank you!Cyclamen

© 2022 - 2024 — McMap. All rights reserved.