Suspended function to read from InputStream
Asked Answered
T

3

8

I'm fairly new to coroutines, therefore I wanted to ask for an opinion.

I've created an extension function to read data from the InputStream:

suspend fun InputStream.readData(): ByteArray {
    return withContext(Dispatchers.IO) {
        while (available() == 0) {
            delay(10)
        }
        val count = available()
        val buffer = ByteArray(count)
        read(buffer, 0, count)
        return@withContext buffer
    }
}

Do you think is there something I could improve from the coroutines point of view?

Tony answered 10/1, 2019 at 1:5 Comment(0)
I
7
while (available() == 0) {
    delay(10)
}

Here you hope you've achieved non-blocking IO using the InputStream. You imagine that the data would be somehow "trickling in" on its own and you could just wait for it to become available so you can pick it up without blocking in the subsequent read() call.

This behavior isn't universal to any InputStream. In fact, it probably only works with a SocketInputStream and there it also has issues: when the remote end has closed the connection, it will keep returning 0 until you make another read call to observe that the socket is closed.

In other implementations of InputStream, available() will always return 0 unless the stream is buffered, in which case it will simply tell you how much is left in the buffer. When the buffer is empty, the input stream implementation won't try to fetch any more data from the underlying resource until you call read().

Therefore I'd suggest at least narrowing down the receiver of your function to SocketInputStream, but for full correctness you should use NIO code instead.

Finally, if you find that for your specific use case the available() loop does work as expected and read() never blocks, then you should drop withContext(IO) because it is implies two costly context switches (to a background thread and back) and its purpose is only to run blocking code off the GUI thread.

Ilka answered 10/1, 2019 at 8:28 Comment(3)
The code I posted is for reading from a socket. The problem with the read() operation is that it blocking and can't be canceled. I'm going to use the posted code inside of the withTimeout clause, so this is the reason for the delay(10).Tony
For all I know, the available() mechanism is useless in any case. It never worked for me and I did try it several times. You should use non-blocking network IO and run in the Main dispatcher.Ilka
I think it all depends on how one wants to use the inputStream. In my case I'm going to read until I find the information I'm interested in (or the read will timeout). Won't read until the socket is closed. Plus according the documentation the available() returns the number that won't block the read operation.Tony
J
2

Your code seems ok from the coroutines point of view, nothing to improve. Just call the function from a coroutine builder: launch - if you want a concurrency or async - if you want parallelism. For example:

yourScope.launch {

    val inputStream = BufferedInputStream(FileInputStream("filename"))
    val result = inputStream.use {
        it.readData()
    }

    // use ByteArray result
}

Additional you can a little bit reduce your code replacing return@withContext buffer with buffer and moving withContext(Dispatchers.IO) out of the function's block:

suspend fun InputStream.readData(): ByteArray = withContext(Dispatchers.IO) {
    while (available() == 0) {
        delay(10)
    }
    val count = available()
    val buffer = ByteArray(count)
    read(buffer, 0, count)
    buffer
}
Joejoeann answered 10/1, 2019 at 7:36 Comment(1)
Is there no support for something like this in stdlib?Stephi
S
0

In addition the Marko's answer I'd like to point that the fact you cannot turn your blocking code into non-blocking one just by using coroutines doesn't mean you should not use coroutines whatsoever. It makes sense to use them in order to get other benefits:

  1. It enables to keep sequential style for asynchronous code. If you have several steps to complete a whole task you don't need to use special reactive types and its combiners.
  2. Provides a great way to scale task execution. In the case of multiple tasks you can execute them structurally in several contexts and and by different dispatchers.

Hope this helps to comprehend the whole picture.

Stenophyllous answered 11/1, 2019 at 19:11 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.