Ktor response streaming
Asked Answered
A

1

6

I am trying to call a twitter endpoint that gives you a constant streams of json results back to the client

https://documenter.getpostman.com/view/9956214/T1LMiT5U#977c147d-0462-4553-adfa-d7a1fe59c3ec

I try to make a call to the endpoint like this

        val url = "https://api.twitter.com/2/tweets/search/stream"
        _streamChannel = _client.get<ByteReadChannel>(token, url) //Stops here

        val byteBufferSize = 1024
        val byteBuffer = ByteArray(byteBufferSize)

        _streamChannel?.let {
            while (_streamChannel!!.availableForRead > 0) {
                _streamChannel!!.readAvailable(byteBuffer, 0, byteBufferSize)
                val s = String(byteBuffer)
                parseStreamResponseString(s).forEach {
                    emit(Response.Success(it))
                }
            }
        }

my client.get code is this

suspend inline fun <reified T> get(authKey: String, url: String): T? {
    val response = _client.get<HttpResponse>(url) {
        header("Authorization", "Bearer $authKey")
    }

    when (response.status.value) {
        in 300..399 -> throw RedirectResponseException(response)
        in 400..499 -> throw ClientRequestException(response)
        in 500..599 -> throw ServerResponseException(response)
    }

    if (response.status.value >= 600) {
        throw ResponseException(response)
    }

    return response.receive<T>()
}

When I make the request it just sits there in what I am assuming is waiting for the full response to be returned before giving it to me.

Edit

I also tried using scoped streaming but it just sits at the line readAvailable I know there are messages coming through because when I run the request via cURL I am constantly getting data

    _client.get<HttpStatement> {
        header("Authorization", "Bearer $authKey")
        url(urlString)
        contentType(ContentType.Application.Json)
        method = HttpMethod.Get
    }.execute {
        val streamChannel = it.receive<ByteReadChannel>()
        val byteBufferSize = 1024
        val byteBuffer = ByteArray(byteBufferSize)
        streamChannel.readAvailable(byteBuffer, 0, byteBufferSize) // Stops here
        val s = String(byteBuffer)
    }

How do I process a constant stream of json data using Ktor?

Aventurine answered 14/11, 2020 at 3:59 Comment(0)
B
2

As far as I am aware, the Ktor client does note expose access to the IO buffer of the request in the way that twitter's streaming API requires.

From the twitter documentation here:

Some HTTP client libraries only return the response body after the connection has been closed by the server. These clients will not work for accessing the Streaming API. You must use an HTTP client that will return response data incrementally. Most robust HTTP client libraries will provide this functionality. The Apache HttpClient will handle this use case, for example.

What you are doing is telling Ktor that the thing you are getting is a ByteReadChannel, and so, once the request closes (which will never happen with this twitter endpoint) the Ktor client would attempt to use whatever plugin (json for example) you were using to parse that data into a ByteReadChannel. It would also not be able to do that, because the data you are getting from twitter is not a ByteReadChannel, it is a new line seperated list of json objects.

Bur answered 26/9, 2021 at 5:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.