Performance of token range based queries on partition keys?
Asked Answered
M

2

3

I am selecting all records from cassandra nodes based on token range of my partition key.

Below is the code:

public static synchronized List<Object[]> getTokenRanges(
      final Session session) {

    if (cluster == null) {
      cluster = session.getCluster();
    }

    Metadata metadata = cluster.getMetadata();

    return unwrapTokenRanges(metadata.getTokenRanges());
  }

  private static List<Object[]> unwrapTokenRanges(Set<TokenRange> wrappedRanges) {

    final int tokensSize = 2;
    List<Object[]> tokenRanges = new ArrayList<>();
    for (TokenRange tokenRange : wrappedRanges) {
      List<TokenRange> unwrappedTokenRangeList = tokenRange.unwrap();
      for (TokenRange unwrappedTokenRange : unwrappedTokenRangeList) {
        Object[] objects = new Object[tokensSize];
        objects[0] = unwrappedTokenRange.getStart().getValue();
        objects[1] = unwrappedTokenRange.getEnd().getValue();
        tokenRanges.add(objects);
      }
    }
    return tokenRanges;
  }

getTokenRanges gives me all token range of vnodes across all nodes.

Then I am using these token range to query cassandra. object[0] holds start token of vnode and object[1] end token.

Which generates below query:

SELECT * FROM my_key_space.tablename WHERE token(id)><start token number> AND token(id)<= <end token number>;

In above id column is partition key.

In Cassandra it is not recommended to perform range queries, So, will this query be performant?

From what I know, this query will call, only the individual partition/vnode and will not call multiple partitions and hence there should not be any performance issue? Is this correct?

Cassandra version: 3.x

Maurizio answered 9/1, 2019 at 6:31 Comment(0)
D
1

Queries on the token ranges are performant, and Spark uses them for effective data fetching. But you need to need to keep in mind following - getTokenRanges will give you all existing token ranges, but there are some edge cases - the last range will be from some positive number to negative number that represents first range, and as such, your query won't do anything. Basically you miss data between MIN_TOKEN and first token, and between last token and MAX_TOKEN. Spark Connector generates different CQL statements based on the token. Plus you need to route query to correct node - this could be done via setRoutingToken.

Similar approach could be used in Java code (full code):

    Metadata metadata = cluster.getMetadata();
    Metadata metadata = cluster.getMetadata();
    List<TokenRange> ranges = new ArrayList(metadata.getTokenRanges());
    Collections.sort(ranges);
    System.out.println("Processing " + (ranges.size()+1) + " token ranges...");

    Token minToken = ranges.get(0).getStart();
    String baseQuery = "SELECT id, col1 FROM test.range_scan WHERE ";
    Map<String, Token> queries = new HashMap<>();
    // generate queries for every range
    for (int i = 0; i < ranges.size(); i++) {
        TokenRange range = ranges.get(i);
        Token rangeStart = range.getStart();
        Token rangeEnd = range.getEnd();
        if (i == 0) {
            queries.put(baseQuery + "token(id) <= " + minToken, minToken);
            queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
        } else if (rangeEnd.equals(minToken)) {
            queries.put(baseQuery + "token(id) > " + rangeStart, rangeEnd);
        } else {
            queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
        }
    }

    // Note: It could be speedup by using async queries, but for illustration it's ok
    long rowCount = 0;
    for (Map.Entry<String, Token> entry: queries.entrySet()) {
        SimpleStatement statement = new SimpleStatement(entry.getKey());
        statement.setRoutingToken(entry.getValue());
        ResultSet rs = session.execute(statement);
        // .... process data
   }
Delicatessen answered 9/1, 2019 at 11:5 Comment(11)
Thanks for the detailed answer, on you edge case comment. In my code I am doing tokenRange.unwrap(). I was thinking this call would divide the token in 2 parts: first part will be last token to MIN_TOKEN and second part will be MIN_TOKEN to first token. Won't this solve the edge case you talked about? So In my case If I have total 64 tokens across nodes, I will get list of tokens which has 65 entries, last 2 being unwrapped. Can you please confirm?Maurizio
On you comment on statement.setRoutingToken(), I do not see this method in the Cassandra version I am running. I am using spark-cassandra-connector_2.11, version: 2.3.2. Is there any such method which I can use? Further what will happen if don't pass routing token, I was in assumption that this query would be routed by any node to the correct node based on token range?Maurizio
No, I checked - this routing doesn't happen automatically for token ranges - that's why you need to pass it explicitly. But if you have spark cassandra connector - why are you doing it manually?Delicatessen
I am using java for spark jobs, not sure how do I create JavaRDD token ranges using spark cassandra connector and my other application which is not spark also needs select all query.Maurizio
Spark Connector does this for you - so you don't need to care about it - just look to its source code that I've linked. Also, if you're using Spark, I recommend to use Spark SQL/Dataframes - it has more optimizations. You can find more examples here: github.com/alexott/dse-java-playground/tree/master/src/main/…Delicatessen
Sure Alex, Checked CassandraJavaUtil, I needed json data as select results, which CassandraRow provides as string, think I can change my design to use spark connector. Thanks for helpMaurizio
Yes - you can save JSON from dataframeDelicatessen
Just a dumb question, When I fetch CassandraRow using CassandraJavaUtil, does it fetch all records in one go? I have 40 GB of data If I load that in one go, I might run into memory issue in my spark driver. Currently, I was running these token range based queries in individual executors and processing data in executors.Maurizio
In spark it depends on your logic inside SparkDelicatessen
@AlexOtt for the edge case that you have mentioned , the last range(left , right)( where left is positive and right is negative) , can we split it from left to MAX_LONG and MIN_LONG to right. this would avoid multiple queries and simplify if we are already using custom token range logic.Subaudition
It’s done - see first branch of ifDelicatessen
V
2

Yes, token range queries, as opposed to ordinary range queries on the actual partition key, are indeed performant, because they can read from disk sequentially (the partitions are stored on disk in sequential token order) and read sequential data from the same node (adjacent tokens belong to the same node).

Cassandra gives you a hint that this sort of query will perform well by that it doesn't require you to use "ALLOW FILTERING". Had you tried to do a range query on the actual partition key (not its token), this would have required you to add a "ALLOW FILTERING" to show you are aware that this will have bad performance.

Vazquez answered 9/1, 2019 at 8:57 Comment(0)
D
1

Queries on the token ranges are performant, and Spark uses them for effective data fetching. But you need to need to keep in mind following - getTokenRanges will give you all existing token ranges, but there are some edge cases - the last range will be from some positive number to negative number that represents first range, and as such, your query won't do anything. Basically you miss data between MIN_TOKEN and first token, and between last token and MAX_TOKEN. Spark Connector generates different CQL statements based on the token. Plus you need to route query to correct node - this could be done via setRoutingToken.

Similar approach could be used in Java code (full code):

    Metadata metadata = cluster.getMetadata();
    Metadata metadata = cluster.getMetadata();
    List<TokenRange> ranges = new ArrayList(metadata.getTokenRanges());
    Collections.sort(ranges);
    System.out.println("Processing " + (ranges.size()+1) + " token ranges...");

    Token minToken = ranges.get(0).getStart();
    String baseQuery = "SELECT id, col1 FROM test.range_scan WHERE ";
    Map<String, Token> queries = new HashMap<>();
    // generate queries for every range
    for (int i = 0; i < ranges.size(); i++) {
        TokenRange range = ranges.get(i);
        Token rangeStart = range.getStart();
        Token rangeEnd = range.getEnd();
        if (i == 0) {
            queries.put(baseQuery + "token(id) <= " + minToken, minToken);
            queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
        } else if (rangeEnd.equals(minToken)) {
            queries.put(baseQuery + "token(id) > " + rangeStart, rangeEnd);
        } else {
            queries.put(baseQuery + "token(id) > " + rangeStart + " AND token(id) <= " + rangeEnd, rangeEnd);
        }
    }

    // Note: It could be speedup by using async queries, but for illustration it's ok
    long rowCount = 0;
    for (Map.Entry<String, Token> entry: queries.entrySet()) {
        SimpleStatement statement = new SimpleStatement(entry.getKey());
        statement.setRoutingToken(entry.getValue());
        ResultSet rs = session.execute(statement);
        // .... process data
   }
Delicatessen answered 9/1, 2019 at 11:5 Comment(11)
Thanks for the detailed answer, on you edge case comment. In my code I am doing tokenRange.unwrap(). I was thinking this call would divide the token in 2 parts: first part will be last token to MIN_TOKEN and second part will be MIN_TOKEN to first token. Won't this solve the edge case you talked about? So In my case If I have total 64 tokens across nodes, I will get list of tokens which has 65 entries, last 2 being unwrapped. Can you please confirm?Maurizio
On you comment on statement.setRoutingToken(), I do not see this method in the Cassandra version I am running. I am using spark-cassandra-connector_2.11, version: 2.3.2. Is there any such method which I can use? Further what will happen if don't pass routing token, I was in assumption that this query would be routed by any node to the correct node based on token range?Maurizio
No, I checked - this routing doesn't happen automatically for token ranges - that's why you need to pass it explicitly. But if you have spark cassandra connector - why are you doing it manually?Delicatessen
I am using java for spark jobs, not sure how do I create JavaRDD token ranges using spark cassandra connector and my other application which is not spark also needs select all query.Maurizio
Spark Connector does this for you - so you don't need to care about it - just look to its source code that I've linked. Also, if you're using Spark, I recommend to use Spark SQL/Dataframes - it has more optimizations. You can find more examples here: github.com/alexott/dse-java-playground/tree/master/src/main/…Delicatessen
Sure Alex, Checked CassandraJavaUtil, I needed json data as select results, which CassandraRow provides as string, think I can change my design to use spark connector. Thanks for helpMaurizio
Yes - you can save JSON from dataframeDelicatessen
Just a dumb question, When I fetch CassandraRow using CassandraJavaUtil, does it fetch all records in one go? I have 40 GB of data If I load that in one go, I might run into memory issue in my spark driver. Currently, I was running these token range based queries in individual executors and processing data in executors.Maurizio
In spark it depends on your logic inside SparkDelicatessen
@AlexOtt for the edge case that you have mentioned , the last range(left , right)( where left is positive and right is negative) , can we split it from left to MAX_LONG and MIN_LONG to right. this would avoid multiple queries and simplify if we are already using custom token range logic.Subaudition
It’s done - see first branch of ifDelicatessen

© 2022 - 2024 — McMap. All rights reserved.