What is the best way to get backpressure for Cassandra Writes?
Asked Answered
L

2

6

I have a service that consumes messages off of a queue at a rate that I control. I do some processing and then attempt to write to a Cassandra cluster via the Datastax Java client. I have setup my Cassandra cluster with maxRequestsPerConnection and maxConnectionsPerHost. However, in testing I have found that when I have reached maxConnectionsPerHost and maxRequestsPerConnection calls to session.executeAsync don't block.

What I am doing right now is using a new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) and incrementing it before every async request and decrementing it when the future returned by executeAsync completes. This works well enough, but it seems redundant since the driver is already tracking requests and connections internally.

Has anyone come up with a better solution to this problem?

One caveat: I would like a request to be considered outstanding until it has completed. This includes retries! The situation where I am getting retryable failures from the cluster (such as timeouts waiting for consistency) is primary situation where I want to backpressure and stop consuming messages from the queue.

Problem:

// the rate at which I consume messages depends on how fast this method returns
processMessage(message) {
    // this appears to return immediately even if I have exhausted connections/requests
    session.executeAsync(preparedStatement.bind(...));
}

Current solution:

constructor() {
    this.concurrentRequestsSemaphore = new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection);
}

processMessage(message) {
    ResultSetFuture resultSetFuture = session.executeAsync(preparedStatement.bind(...));
    CompletableFuture<ResultSet> future = completableFromListenable(resultSetFuture);
    concurrentRequestsSemaphore.acquireUninterruptibly();
    future.whenComplete((result, exception) -> concurrentRequests.release());
}

Also, can anyone see any obvious problems with this solution?

Lati answered 10/2, 2016 at 19:8 Comment(1)
I think your use case as described would greatly benefit from RxJava; it makes retries trivial and backpressure doable.Anyway
A
5

One possible idea not to kill the cluster is to "throttle" your calls to executeAsync e.g. after a batch of 100 (or whatever number is the best for your cluster and workload), you'll do a sleep in the client code and do a blocking call on all the 100 futures (or use Guava library to transform a list of future into a future of list)

This way, after issuing 100 async queries, you'll force the client application to wait for all of them to succeed before proceeding further. If you catch any exception when calling future.get(), you can schedule a retry. Normally the retry is already attempted by the default RetryStrategy of the Java driver.

About back-pressure signal from server, starting from CQL binary protocol V3, there is an error code that notifies the client that the coordinator is overloaded : https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v3.spec#L951

From the client, you can get this overloaded information in 2 ways:

Avicenna answered 10/2, 2016 at 22:38 Comment(5)
I believe doing batches of 100 would result in a lot of unused bandwidth since I would have to wait for all 100 requests to come back before I started the next request. At the end of the wait, I would have an empty pipe that could be full of 99 things.Lati
The overloaded bit is cool. I would love to extend my retry policy to backoff when the cluster is overloaded, that would allow me to retry more aggressively when it isn't overloaded without fear of breaking the cluster.Lati
Yes, in the latest java driver version, you can override the onRequestError() method on the RetryPolicy to intercept the OverloadedException and do a manual back-offAvicenna
How does this apply when you are using the spark-cassandra connector instead of the standard?Haematoma
When using Spark/Cassandra connector, the async parallelism is handled for you by the connector itself.Avicenna
B
2

What I am doing right now is using a new Semaphore(maxConnectionsPerHost * maxRequestsPerConnection) and incrementing it before every async request and decrementing it when the future returned by executeAsync completes. This works well enough, but it seems redundant since the driver is already tracking requests and connections internally.

That is a pretty reasonable approach, that allows new requests to fill in while other ones complete. You can tie releasing a permit to the future completion.

The reason why the driver doesn't do this itself is that it tries to do as little blocking as possible and instead fails fast. Unfortunately this pushes some responsibility to the client.

In the usual case it is not good to send that many requests simultaneously to a host at a time. C* has a native_transport_max_threads setting (default 128) that controls the number of threads handling requests at a time. It would be better to throttle yourself at that 2 * that number per host. (See: How Cassandra handle blocking execute statement in datastax java driver for more detail there)

I would like a request to be considered outstanding until it has completed. This includes retries! The situation where I am getting retryable failures from the cluster (such as timeouts waiting for consistency) is primary situation where I want to backpressure and stop consuming messages from the queue.

The driver will not complete the future until it has completed successfully, exhausted its retries or failed for some reason. Therefore you can tie releasing of the the semaphore permits until the future completes or fails.

Bandeen answered 10/2, 2016 at 22:45 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.