Google Speech - Streaming Request Returns EOF Error
Asked Answered
S

1

8

Using Go, I'm taking a RTMP stream, transcoding it to FLAC (using ffmpeg) and attempting to stream to Google's Speech API to transcribe the audio. However, I keep getting EOF errors when sending the data. I can't find any information on this error in the docs so I'm not exactly sure what's causing it.

I'm chunking the received data into 3s clips (length isn't relevant as long as it's less than the maximum length of a streaming recognition request).

Here is the core of my code:

func main() {

    done := make(chan os.Signal)
    received := make(chan []byte)

    go receive(received)
    go transcribe(received)

    signal.Notify(done, os.Interrupt, syscall.SIGTERM)

    select {
    case <-done:
        os.Exit(0)
    }
}

func receive(received chan<- []byte) {
    var b bytes.Buffer
    stdout := bufio.NewWriter(&b)

    cmd := exec.Command("ffmpeg", "-i", "rtmp://127.0.0.1:1935/live/key", "-f", "flac", "-ar", "16000", "-")
    cmd.Stdout = stdout

    if err := cmd.Start(); err != nil {
        log.Fatal(err)
    }

    duration, _ := time.ParseDuration("3s")
    ticker := time.NewTicker(duration)

    for {
        select {
        case <-ticker.C:
            stdout.Flush()
            log.Printf("Received %d bytes", b.Len())
            received <- b.Bytes()
            b.Reset()
        }
    }
}

func transcribe(received <-chan []byte) {
    ctx := context.TODO()

    client, err := speech.NewClient(ctx)
    if err != nil {
        log.Fatal(err)
    }

    stream, err := client.StreamingRecognize(ctx)
    if err != nil {
        log.Fatal(err)
    }

    // Send the initial configuration message.
    if err = stream.Send(&speechpb.StreamingRecognizeRequest{
        StreamingRequest: &speechpb.StreamingRecognizeRequest_StreamingConfig{
            StreamingConfig: &speechpb.StreamingRecognitionConfig{
                Config: &speechpb.RecognitionConfig{
                    Encoding:        speechpb.RecognitionConfig_FLAC,
                    LanguageCode:    "en-GB",
                    SampleRateHertz: 16000,
                },
            },
        },
    }); err != nil {
        log.Fatal(err)
    }

    for {
        select {
        case data := <-received:
            if len(data) > 0 {
                log.Printf("Sending %d bytes", len(data))
                if err := stream.Send(&speechpb.StreamingRecognizeRequest{
                    StreamingRequest: &speechpb.StreamingRecognizeRequest_AudioContent{
                        AudioContent: data,
                    },
                }); err != nil {
                    log.Printf("Could not send audio: %v", err)
                }
            }
        }
    }
}

Running this code gives this output:

2017/10/09 16:05:00 Received 191704 bytes
2017/10/09 16:05:00 Saving 191704 bytes
2017/10/09 16:05:00 Sending 191704 bytes
2017/10/09 16:05:00 Could not send audio: EOF

2017/10/09 16:05:03 Received 193192 bytes
2017/10/09 16:05:03 Saving 193192 bytes
2017/10/09 16:05:03 Sending 193192 bytes
2017/10/09 16:05:03 Could not send audio: EOF

2017/10/09 16:05:06 Received 193188 bytes
2017/10/09 16:05:06 Saving 193188 bytes
2017/10/09 16:05:06 Sending 193188 bytes // Notice that this doesn't error

2017/10/09 16:05:09 Received 191704 bytes
2017/10/09 16:05:09 Saving 191704 bytes
2017/10/09 16:05:09 Sending 191704 bytes
2017/10/09 16:05:09 Could not send audio: EOF

Notice that not all of the Sends fail.

Could anyone point me in the right direction here? Is it something to do with the FLAC headers or something? I also wonder if maybe resetting the buffer causes some of the data to be dropped (i.e. it's a non-trivial operation that actually takes some time to complete) and it doesn't like this missing information?

Any help would be really appreciated.

Schach answered 9/10, 2017 at 16:17 Comment(2)
This is a totaly random though, but have you tried running it with the race detector? It probably won't show anything, but you never know...Doorsill
@MiloChristiansen good call! There is a race condition. I tried to create a thread-safe version of bufio.Writer but the library uses ReadFrom which continuously reads from the reader. It only releases the lock when everything has been read :-/Schach
S
7

So, it turns out there's a way of getting more information about the status of the stream, so we don't just have to rely on the returned error.

if err := stream.Send(&speechpb.StreamingRecognizeRequest{
    StreamingRequest: &speechpb.StreamingRecognizeRequest_AudioContent{
        AudioContent: data,
    },
}); err != nil {
    resp, err := stream.Recv()
    log.Printf("Could not send audio: %v", resp.GetError())
}

This prints:

2017/10/16 17:14:53 Could not send audio: code:3 message:"Invalid audio content: too long."

Which is a far more helpful error message!

Schach answered 16/10, 2017 at 16:15 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.