Issue in full table scan in cassandra
Asked Answered
D

5

14

First: I know isn't a good idea do a full scan in Cassandra, however, at moment, is that what I need.

When I started look for do someting like this I read people saying wasn't possible do a full scan in Cassandra and he wasn't made to do this type of thing.

Not satisfied, I keep looking until I found this article: http://www.myhowto.org/bigdata/2013/11/04/scanning-the-entire-cassandra-column-family-with-cql/

Look like pretty reasonable and I gave it a try. As I will do this full scan only once and time and performance isn't a issue, I wrote the query and put this in a simple Job to lookup all the records that I want. From 2 billions rows of records, something like 1000 was my expected output, however, I had only 100 records.

My job:

public void run() {
    Cluster cluster = getConnection();
    Session session = cluster.connect("db");

    LOGGER.info("Starting ...");

    boolean run = true;
    int print = 0;

    while ( run ) {
        if (maxTokenReached(actualToken)) {
            LOGGER.info("Max Token Reached!");
            break;
        }
        ResultSet resultSet = session.execute(queryBuilder(actualToken));

        Iterator<Row> rows = resultSet.iterator();
        if ( !rows.hasNext()){
            break;
        }

        List<String> rowIds = new ArrayList<String>();

        while (rows.hasNext()) {
            Row row = rows.next();

            Long leadTime = row.getLong("my_column");
            if (myCondition(myCollumn)) {
                String rowId = row.getString("key");
                rowIds.add(rowId);
            }

            if (!rows.hasNext()) {
                Long token = row.getLong("token(rowid)");
                if (!rowIds.isEmpty()) {
                    LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                }
                actualToken = nextToken(token);
            }

        }

    }
    LOGGER.info("Done!");
    cluster.shutdown();
}

public boolean maxTokenReached(Long actualToken){
    return actualToken >= maxToken;
}

public String queryBuilder(Long nextRange) {
    return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString());
}

public Long nextToken(Long token){
    return token + 1;
}

Basically what I do is search for the min token allowed and incrementally go until the last.

I don't know, but is like the job had not done the full-scan totally or my query had only accessed only one node or something. I don't know if I'm doing something wrong, or is not really possible do a full scan.

Today I have almost 2 TB of data, only one table in one cluster of seven nodes.

Someone already has been in this situation or have some recommendation?

Defant answered 24/4, 2015 at 1:4 Comment(2)
what is keyspace schema for 'mytable' ? is query running multiple times (because of while loop) and last query might me returning 100 instead of 1000Unkindly
Schema: pastebin.com/DyWAc1wa . And yes, the query is running multiple time and return all the rows setted on LIMIT clause.Defant
F
8

It's definitely possible to do a full table scan in Cassandra - indeed, it's quite common for things like Spark. However, it's not typically "fast", so it's discouraged unless you know why you're doing it. For your actual questions:

1) If you're using CQL, you're almost certainly using Murmur3 partitioner, so your minimum token is -9223372036854775808 (and maximum token is 9223372036854775808).

2) You're using session.execute(), which will use a default consistency of ONE, which may not return all of the results in your cluster, especially if you're also writing at ONE, which I suspect you may be. Raise that to ALL, and use prepared statements to speed up the CQL parsing:

 public void run() {
     Cluster cluster = getConnection();
     Session session = cluster.connect("db");
     LOGGER.info("Starting ...");
     actualToken = -9223372036854775808;
     boolean run = true;
     int print = 0;

     while ( run ) {
         if (maxTokenReached(actualToken)) {
             LOGGER.info("Max Token Reached!");
             break;
         }
         SimpleStatement stmt = new SimpleStatement(queryBuilder(actualToken));
         stmt.setConsistencyLevel(ConsistencyLevel.ALL);
         ResultSet resultSet = session.execute(stmt);

         Iterator<Row> rows = resultSet.iterator();
         if ( !rows.hasNext()){
             break;
         }

         List<String> rowIds = new ArrayList<String>();

         while (rows.hasNext()) {
             Row row = rows.next();

             Long leadTime = row.getLong("my_column");
             if (myCondition(myCollumn)) {
                 String rowId = row.getString("key");
                 rowIds.add(rowId);
             }

             if (!rows.hasNext()) {
                 Long token = row.getLong("token(rowid)");
                 if (!rowIds.isEmpty()) {
                     LOGGER.info(String.format("Keys found! RowId's: %s ", rowIds));
                 }
             actualToken = nextToken(token);
             }
         }
      }
     LOGGER.info("Done!");
     cluster.shutdown(); 
  }

public boolean maxTokenReached(Long actualToken){
     return actualToken >= maxToken; 
 }

 public String queryBuilder(Long nextRange) {
     return String.format("select token(key), key, my_column from mytable where token(key) >= %s limit 10000;", nextRange.toString()); 
 }

 public Long nextToken(Long token) {
     return token + 1; 
 }
Forcemeat answered 28/4, 2015 at 3:49 Comment(8)
Hey Jeff, first, thanks for your help! I'm using CQL with Murmur and I'm aware about the max and min token values. The job today receive a range of tokens they will search the rows. This way I could throw those ranges in threads to speed this up.Defant
Second, I implemented your sugestion, but I did't had much difference from what I had done, actually, the job had return last than the first time. But once thing I notice was the load of the machine was low than before, was more distributed between the cluster all the time the job was running. Before the load get high only specific machines and in different times.Defant
Raising the consistency should cause more load, because it's querying more replicas to ensure it's not missing any data. To be clear: how many rows did it return, and how many rows do you expect it to return?Forcemeat
Exactly. I expected something like 1000 rows and only had something like 100~200.Defant
Have you run SELECT COUNT(*) to count. That uses the internal paging and should be fairly accurateForcemeat
To use where clause is necessary the columns that will be used in the query should be part of the index or already be indexed and they aren't, so I can not use your approach. I know is expected output should be something like 1k because I store part of the info in two different databases (cassandra and mysql), and today the amount of data is different.Defant
I am curious any update on this. Does this answer miss rows? Quoting earlier comment: I expected something like 1000 rows and only had something like 100~200.Amaranth
On Point to using CONSISTENCY=ALL is bad as any node failure will cause application exception.Immoderate
M
2

I'd highly recommend using Spark - even in a stand alone application (i.e. without a cluster). It'll take care of chunking up the partitions and process them one by one. Dead easy to use too:

https://github.com/datastax/spark-cassandra-connector

Marquet answered 28/4, 2015 at 15:10 Comment(0)
S
2

This is a very old question, but I'm answering it because I ran into the same problem of not getting all the rows and found the cause.

This problem occurs when there are multiple rows for one partition key.

In the above implementation, when a row in the middle of a partition is returned due to the LIMIT limitation, the rest of the rows in that partition will be lost.

This is because in the next query, the where statement will start reading from the value of the next partitions.

For example, suppose we have a table like the following

partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              1|     1|                     1
              1|     2|                     1
              1|     3|                     1
              2|     4|                     2
              2|     5|                     2
              2|     6|                     2
              3|     7|                     3
              4|     8|                     4

If we run the above example code with LIMIT 2 on this table, we get...

1st iteration

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 0 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              1|     1|                     1
              1|     2|                     1

2nd iteration

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 1 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              2|     4|                     2
              2|     5|                     2

3rd iteration

SELECT partitionKeyCol, IdxCol, token(partitionKeyCol) FROM table WHERE token(partitionKeyCol) > 2 LIMIT 2;
partitionKeyCol|IdxCol|token(partitionKeyCol)
---------------------------------------------
              3|     7|                     3
              4|     8|                     4

As a result, we cannot get idx 3 and 6.

This is a common paging query implementation mistake.

Synodic answered 13/8, 2021 at 10:2 Comment(0)
M
1

Is this for a common thing you need to do? Or a one case scenario? I agree this is not a advisable thing you want to do on a regular basis, but I also had an issue where I had to read through all rows from a ColumnFamily and I relied on AllRowsReader recipe from Astyanax client. I'm seeing that you are using Datastax CQL driver to connect to your cluster, but if what you're looking is something that is proved to work, you might not care dealing with problem using Astyanax library.

In my case I used to read all row keys and then I had another job to interact with the ColumnFamily with the keys I collected.

import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.recipes.reader.AllRowsReader;

import java.util.concurrent.CopyOnWriteArrayList;

...        

private final Keyspace keyspace;
private final ColumnFamily<String, byte[]> columnFamily;

public List<String> getAllKeys() throws Exception {

    final List<String> rowKeys = new CopyOnWriteArrayList<>();

    new AllRowsReader.Builder<>(keyspace, columnFamily).withColumnRange(null, null, false, 0)
        .withPartitioner(null) // this will use keyspace's partitioner
        .withConsistencyLevel(ConsistencyLevel.CL_ONE).forEachRow(row -> {
        if (row == null) {
            return true;
        }

        String key = row.getKey();

        rowKeys.add(key);

        return true;
    }).build().call();

    return rowKeys;
}

There are different configuration options to run this in several threads and many other things, like I said I just ran this once in my code and worked really well, I'd be happy to help if you ran into issues trying it to make it work.

Hope this helps,

José Luis

Millymilman answered 30/4, 2015 at 6:28 Comment(0)
S
1

If you regularly need to do full table scans of a Cassandra table, say for analytics in Spark, then I highly suggest you consider storing your data using a data model that is read-optimized. You can check out http://github.com/tuplejump/FiloDB for an example of a read-optimized setup on Cassandra.

Shanley answered 16/5, 2016 at 14:51 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.