Cassandra: How to insert a new wide row with good performance using CQL
Asked Answered
X

3

7

I am evaluating cassandra. I am using the datastax driver and CQL.

I would like to store some data with the following internal structure, where the names are different for each update.

+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+

So time should be the column key, and name should be the row key. The CQL statement I use to create this table is:

CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

I want the schema to be this way for ease of querying. I also have to occasionally store updates with more than 65000 rows. So using the cassandra list/set/map data types is not an option.

I have to be able to handle at least 1000 wide row inserts per second, with a varying but large (~1000) number of name/value pairs.

The problem is the following: I have written a simple benchmark that does 1000 wide row inserts of 10000 name/value pairs each. I am getting very slow performance with CQL and the datastax driver, whereas the version that does not use CQL (using astyanax) has good performance on the same test cluster.

I have read this related question, and in the accepted answer of this question suggests that you should be able to atomically and quickly create a new wide row by using batch prepared statements, which are available in cassandra 2.

So I tried using those, but I still get slow performance (two inserts per second for a small three-node cluster running on localhost). Am I missing something obvious, or do I have to use the lower level thrift API? I have implemented the same insert with a ColumnListMutation in astyanax, and I get about 30 inserts per second.

If I have to use the lower level thrift API:

  • is it actually deprecated, or is it just inconvenient to use because it is lower level?

  • will I be able to query a table created with the thrift api with CQL?

Below is a self-contained code example in scala. It simply creates a batch statement for inserting a wide row with 10000 columns and times the insertion performance repeatedly.

I played with the options of BatchStatement and with the consistency level, but nothing could get me better performance.

The only explanation I have is that despite the batch consisting of prepared statements, the entries are added to the row one by one.


package cassandra

import com.datastax.driver.core._

object CassandraTestMinimized extends App {

  val keyspace = "test"
  val table = "wide"
  val tableName = s"$keyspace.$table"

  def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""

  def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""

  def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""

  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect()

  session.execute(createKeyspace)
  session.execute(createWideTable)

  for(i<-0 until 1000) {
    val entries =
      for {
        i <- 0 until 10000
        name = i.toString
        value = name
      } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
  }

  def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
      .prepare(writeTimeNameValue(time.toString))
      .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
      batch.add(template.bind(k, v))
    batch
  }
}

Here is the astyanax code (modified from an astyanax example) that does essentially the same thing 15 times faster. Note that this also does not use asynchronous calls so it is a fair comparison. This requires the column family to already exist, since I did not yet figure out how to create it using astyanax and the example did not have any code for creating the columnfamily.

package cassandra;

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
        logger.debug("init()");

        context = new AstyanaxContext.Builder()
                .forCluster("Test Cluster")
                .forKeyspace("test1")
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                        .setPort(9160)
                        .setMaxConnsPerHost(1)
                        .setSeeds("127.0.0.1:9160")
                )
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setCqlVersion("3.0.0")
                        .setTargetCassandraVersion("2.0.5"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        keyspace = context.getClient();

        EMP_CF = ColumnFamily.newColumnFamily(
                EMP_CF_NAME,
                LongSerializer.get(),
                AsciiSerializer.get());
    }

    public void insert(long time) {
        MutationBatch m = keyspace.prepareMutationBatch();

        ColumnListMutation<String> x =
                m.withRow(EMP_CF, time);
        for(int i=0;i<10000;i++)
            x.putColumn(Integer.toString(i), Integer.toString(i));

        try {
            @SuppressWarnings("unused")
            Object result = m.execute();
        } catch (ConnectionException e) {
            logger.error("failed to write data to C*", e);
            throw new RuntimeException("failed to write data to C*", e);
        }
        logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
        OperationResult<ColumnList<String>> result;
        try {
            result = keyspace.prepareQuery(EMP_CF)
                    .getKey(time)
                    .execute();

            ColumnList<String> cols = result.getResult();
            // process data

            // a) iterate over columsn
            for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                Column<String> c = i.next();
                String v = c.getStringValue();
                System.out.println(c.getName() + " " + v);
            }

        } catch (ConnectionException e) {
            logger.error("failed to read from C*", e);
            throw new RuntimeException("failed to read from C*", e);
        }
    }

    public static void main(String[] args) {
        AstClient c = new AstClient();
        c.init();
        long t00 = System.nanoTime();
        for(int i=0;i<1000;i++) {
            long t0 = System.nanoTime();
            c.insert(i);
            long dt = System.nanoTime() - t0;
            System.out.println((1.0e9/dt) + " " + i);
        }
        long dtt = System.nanoTime() - t00;

        c.read(0);
        System.out.println(dtt / 1e9);
    }

}

Update: I found this thread on the cassandra-user mailing list. It seems that there is a performance problem with CQL when doing large wide row inserts. There is a ticket CASSANDRA-6737 to track this issue.

Update2: I have tried out the patch that is attached to CASSANDRA-6737, and I can confirm that this patch completely fixes the issue. Thanks to Sylvain Lebresne from DataStax for fixing this so quickly!

Xantha answered 14/2, 2014 at 11:52 Comment(0)
K
5

You are not the only person to experience this. I wrote a blog post a while ago focused more on conversion between CQL and thrift, but there are links to mail list issues of folks seeing the same thing (the performance issue of wide-row inserts were my initial motivations for investigating): http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

In sum - CQL is great for removing the burdens of dealing with typing and understanding the data model for folks new to Cassandra. The DataStax driver is well written and contains lots of useful features.

However, the Thrift API is more than slightly faster for wide row inserts. The Netflix blog does not go in to this specific use case so much. Further, the Thrift API is not legacy so long as people are using it (many folks are). It's an ASF project and as such is not run by any single vendor.

In general, with any Cassandra-based application, if you find a way of doing something that meets (or often exceeds) the performance requirements of your workload, stick with it.

Kelliekellina answered 14/2, 2014 at 11:52 Comment(0)
P
8

You have a mistake in your code that I think explains a lot of the performance problems you're seeing: for each batch you prepare the statement again. Preparing a statement isn't super expensive, but doing it as you do adds a lot of latency. The time you spend waiting for that statement to be prepared is time you don't build the batch, and time Cassandra doesn't spend processing that batch. A prepared statement only needs to be prepared once and should be re-used.

I think much of the bad performance can be explained latency problems. The bottleneck is most likely your application code, not Cassandra. Even if you only prepare that statement once, you still spend most of the time either being CPU bound in the application (building a big batch) or not doing anything (waiting for the network and Cassandra).

There are two things you can do: first of all use the async API of the CQL driver and build the next batch while the network and Cassandra are busy with the one you just completed; and secondly try running multiple threads doing the same thing. The exact number of threads you'll have to experiment with and will depend on the number of cores you have and if you're running one or three nodes on the same machine.

Running a three node cluster on the same machine makes the cluster slower than running a single node, while running on different machines makes it faster. Also running the application on the same machine doesn't exactly help. If you want to test performance, either run only one node or run a real cluster on separate machines.

Batches can give you extra performance, but not always. They can lead to the kind of problem you're seeing in your test code: buffer bloat. Once batches get too big your application spends too much time building them, then too much time pushing them out on the network, and too much time waiting for Cassandra to process them. You need to experiment with batch sizes and see what works best (but do that with a real cluster, otherwise you won't see the effects of the network, which will be a big factor when your batches get bigger).

And if you use batches, use compression. Compression makes no difference in most request loads (responses are another matter), but when you send huge batches it can make a big difference.

There's nothing special about wide row writes in Cassandra. With some exceptions the schema doesn't change the time it takes to process a write. I run applications that do tens of thousands of non-batched mixed wide-row and non-wide-row writes per second. The clusters aren't big, just three or four m1.xlarge EC2 nodes each. The trick is never to wait for an request to return before sending the next (that doesn't mean fire and forget, just handle the responses in the same asynchronous manner). Latency is a performance killer.

Piste answered 16/2, 2014 at 10:20 Comment(8)
I don't think building the batch is the culprit. But note that to make sure I don't measure the time building the batch, the timing measures just the execution of the batch, but not the building. I am aware that running a three node cluster on localhost is not the best thing for performance. I am also aware that I would be able to get some improvements by tweaking some settings. But none of that explains that I get 15 times better performance using the Thrift API on the same three node cluster running on localhost.Galvani
I think you're measuring the wrong thing. Rewrite the test code to be multi threaded, do all operations asynchronously and measure the number of writes you get through over a couple of minutes. Measuring the latency of a batch doesn't tell you enough about the actual performance.Piste
But then how come the non-CQL version that also isn't multithreaded performs so much better? Also, what exactly could cause one second of latency on localhost? I think the problem is that CQL does not figure out that all inserts affect the same row key, whereas with the astyanax API I can precisely specify that I want a big update for one row key.Galvani
Could you explain a bit more what your actual measurements are? How long does it take on average to insert a batch of 10,000 items? If you do the same thing with a ColumnListMutation and measure only the time it takes to send and receive the response to that operation, how long does that take? You say "I have to be able to handle at least 1000 wide row inserts per second" but also "I still get absolutely awful performance [...] two inserts per second". Do you mean two batches of 10,000 per second (which would be 20K inserts/s, well above your target). Could you please clarify?Piste
I have modified the code to use compression and to use executeAsync. Does not make a difference. Firing off the futures is almost instantaneous, but 1000 inserts with 10000 columns each takes 545s with CQL, whereas the non-CQL-Version (also modified to be asynchronous) takes 11s.Galvani
I ran a test on my machine using your CQL, in Ruby using the Ruby driver and I can run a batch of 10,000 prepared statement writes in ~1.5s, but I can also run 10,000 async requests in ~1.5s. When you say your 1000 batches of 10000 take 545s, do you just add measurements printed by your test code, or the whole run? I assume the former.Piste
With CQL and non-async, it takes about 0.5s to insert a batch of 10000 items (until the execute call returns). With ColumnListMutation the time until the execute returns is 0.040s on average. When I use async in both cases the difference becomes even larger. CQL stays at 0.5s on average, whereas ColumnListMutation goes down to 0.011s per insert on average. Regarding my requirements: I need to be able to create 1000 new rows per second, where each new row contains 10000 items. This is a bit more than I am really going to need, but it's good to have some margin.Galvani
When using async, the individual timings are pretty much meaningless. So if I say 1000 batches of 10000 take 545s, I mean the time until the entire 1000 batches complete.Galvani
K
5

You are not the only person to experience this. I wrote a blog post a while ago focused more on conversion between CQL and thrift, but there are links to mail list issues of folks seeing the same thing (the performance issue of wide-row inserts were my initial motivations for investigating): http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

In sum - CQL is great for removing the burdens of dealing with typing and understanding the data model for folks new to Cassandra. The DataStax driver is well written and contains lots of useful features.

However, the Thrift API is more than slightly faster for wide row inserts. The Netflix blog does not go in to this specific use case so much. Further, the Thrift API is not legacy so long as people are using it (many folks are). It's an ASF project and as such is not run by any single vendor.

In general, with any Cassandra-based application, if you find a way of doing something that meets (or often exceeds) the performance requirements of your workload, stick with it.

Kelliekellina answered 14/2, 2014 at 11:52 Comment(0)
R
2

Some things you can try... In your cassandra.yaml (this is Cassandra 1.2.x, maybe the params are called somewhat differently in 2.x):

  • Disable the row cache (row_cache_size_in_mb: 0)
  • Increase the memory limit before in-memory rows spill over to disk (min_memory_compaction_limit_in_mb), only do this if you see some log output that sais that spilling does happen
  • Make sure num_tokens / initial_token values are configured properly so rows get distributed across your nodes

Other things you can try:

  • Provide all node IPs in your cluster to the client, not just one
  • Provide more RAM to each Cassandra node
  • Try to run your test multi-threaded
  • Make sure you have JNA installed and in use if you run Cassandra on Linux

Things to clarify:

  • Have you confirmed via nodetool that the 3 nodes have found each other?
  • What does nodetool say about the load distribution of your 3 nodes?
  • What does the physical host of your virtual cluster say about CPU and I/O usage? Maybe it has simply maxed out?
Rarity answered 14/2, 2014 at 15:48 Comment(4)
thanks for the tuning tips. But I think the three nodes work OK. I did run another test that just inserted large blobs, and I got acceptable performance. Nodetool output also looks OK. I think I will have to implement the same bechmark in a thrift API and see how this performs.Galvani
I have now implemented essentially the same thing using the Astyanax thrift API (via a ColumnListMutation), and I am getting 15 times better performance. I would be happy to use the lower level API. In fact I prefer it to CQL since it is closer to the physical data layout. But it seems that its use is discouraged and deprecated.Galvani
@RüdigerKlaehn according to Astyanax devs the thrift API is just slightly faster techblog.netflix.com/2013/12/astyanax-update.html so there must be another cause for this.Eikon
It might be that I am doing something wrong with the prepared statements. But what? The code is pretty simple.Galvani

© 2022 - 2024 — McMap. All rights reserved.