Cassandra timeout during read query at consistency ONE (1 responses were required but only 0 replica responded)
Asked Answered
T

2

9

I am doing read and update queries on a table having 500000 rows and some times getting below error after processing around 300000 rows, even when no node is down.

Cassandra timeout during read query at consistency ONE (1 responses were required but only 0 replica responded)

Infrastructure details:
Having 5 Cassandra nodes, 5 spark and 3 Hadoop nodes each with 8 cores and 28 GB memory and Cassandra replication factor is 3.

Cassandra 2.1.8.621 | DSE 4.7.1 | Spark 1.2.1 | Hadoop 2.7.1.

Cassandra configuration:

read_request_timeout_in_ms (ms): 10000
range_request_timeout_in_ms (ms): 10000
write_request_timeout_in_ms (ms): 5000
cas_contention_timeout_in_ms (ms): 1000 
truncate_request_timeout_in_ms (ms): 60000
request_timeout_in_ms (ms): 10000.

I have tried the same job by increasing read_request_timeout_in_ms (ms) to 20,000 as well but it didn't help.

I am doing queries on two tables. Below is the create statement for one of the tables:

Create Table:

CREATE TABLE section_ks.testproblem_section (
    problem_uuid text PRIMARY KEY,
    documentation_date timestamp,
    mapped_code_system text,
    mapped_problem_code text,
    mapped_problem_text text,
    mapped_problem_type_code text,
    mapped_problem_type_text text,
    negation_ind text,
    patient_id text,
    practice_uid text,
    problem_category text,
    problem_code text,
    problem_comment text,
    problem_health_status_code text,
    problem_health_status_text text,
    problem_onset_date timestamp,
    problem_resolution_date timestamp,
    problem_status_code text,
    problem_status_text text,
    problem_text text,
    problem_type_code text,
    problem_type_text text,
    target_site_code text,
    target_site_text text
    ) WITH bloom_filter_fp_chance = 0.01
    AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
    AND comment = ''
    AND compaction = {'class': 
    'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
    AND compression = {'sstable_compression': 
    'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99.0PERCENTILE';

Queries :

1) SELECT encounter_uuid, encounter_start_date FROM section_ks.encounters WHERE patient_id = '1234' AND encounter_start_date >= '" + formatted_documentation_date + "' ALLOW FILTERING;

2) UPDATE section_ks.encounters SET testproblem_uuid_set = testproblem_uuid_set + {'1256'} WHERE encounter_uuid = 'abcd345';

Tritanopia answered 1/9, 2015 at 9:7 Comment(6)
Can you post your create tableKenyon
...and your query, and I would also try TRACING ON to analyze the issue.Mohamedmohammad
@Kenyon I have added create table. Thanks for responding.Tritanopia
@Mohamedmohammad added query as well.Tritanopia
Don't use allow filtering in production oltp queries. It will be slow. You should, instead, design your table's primary key (partition and clustering) so that you can use a regular CQL query.Kenyon
@Kenyon - I will take your suggestion into consideration and try implementing it.Tritanopia
B
10

Usually when you get a timeout error it means you are trying to do something that isn't scaling well in Cassandra. The fix is often to modify your schema.

I suggest you monitor the nodes while running your query to see if you can spot the problem area. For example, you can run "watch -n 1 nodetool tpstats" to see if any queues are backing up or dropping items. See other monitoring suggestions here.

One thing that might be off in your configuration is that you say you have five Cassandra nodes, but only 3 spark workers (or are you saying you have three spark workers on each Cassandra node?) You'll want at least one spark worker on each Cassandra node so that loading data into spark is done locally on each node and not over the network.

It's hard to tell much more than that without seeing your schema and the query you are running. Are you reading from a single partition? I started getting timeout errors in the vicinity of 300,000 rows when reading from a single partition. See question here. The only workaround I have found so far is to use a client side hash in my partition key to break the partitions up into smaller chunks of around 100K rows. So far I have not found a way to tell Cassandra to not timeout for a query that I expect to take a long time.

Betimes answered 1/9, 2015 at 20:47 Comment(7)
Thanks a lot.I will try your suggestions. Sorry about wrong/brief information about cluster. Actually,EC2 cluster having 5 Cassandra nodes, 5 spark worker nodes out of which 2 spark worker nodes are on the 2 Cassandra nodes and other 3 nodes having hadoop and spark workers on it. Sorry but how to check from how many partitions data is being read?Tritanopia
cfstats and cfhistogramsKenyon
@Abhinandan - Your use of ALLOW FILTERING suggests you're trying to do a table scan. This is not efficient in Cassandra, so you should either restructure your schema to do queries against individual partitions or else load the table into a spark RDD so that it can be worked on in parallel.Betimes
@JimMeyer - I ran "watch -n 1 nodetool tpstats" and I can see that no queue is backing up neither dropping any item.I have loaded table into spark RDD only and then doing query on it.Is there any workaround other than restructuring schema?Tritanopia
I changed concurrent_reads from 64 to 128,with 20 cores and now it's not giving any error.Is it the real solution?Tritanopia
Were you getting the timeout error on a CQL query or on loading the data into spark? I haven't experimented with concurrent_reads before. The main thing to consider is do you really need a full table scan or can you partition your data into smaller chunks that can be queried less expensively. If so then revising your schema would make sense.Betimes
I am having spark job which executes read,write and update queries which gives read timeout error after some time.I need to read a full table as update queries are depend on that.Tritanopia
E
-1

Don't think configuration is a root cause, but data model issue.

It would be cool to see a structure of section_ks.encounters table.

Suggested to think carefully about what concrete queries expected to run before design table(s) structure.

As far as I see, those two queries expects different structure of section_ks.encounters to run them with good performance.

Let's review each provided query and try to design tables:

First one:

SELECT encounter_uuid, encounter_start_date FROM section_ks.encounters WHERE patient_id = '1234' AND encounter_start_date >= '" + formatted_documentation_date + "' ALLOW FILTERING;

  • First point, if Cassandra forces you to add ALLOW FILTERING, that is a symhtome of non-optimal query or table structure.
  • Second point. Primary key. An awesome explanation about what are primary keys in Cassandra Given query would work fast & without mandatory ALLOW FILTERING statement if patient_id column and encounter_start_date column would form a composite primary key. Enumerating of columns inside PRIMARY KEY() statement should correspond to order of filtering in your query.
  • Why ALLOW FILTERING mandatory in original query? By partition key Cassandra knows on which node data located. In case when patient_id column is not partition key, Cassandra had to scan all 5 nodes for find requested patient. When we have a lot of data across nodes, such full scan usually fails by timeout.

Here is an example of table structure fits effectively with given query:

create table section_ks.encounters(
    patient_id bigint, 
    encounter_start_date timestamp, 
    encounter_uuid text,
    some_other_non_unique_column text,
    PRIMARY KEY (patient_id, encounter_start_date)
);
  • patient_id column would be a "partition key". Responsible for data distribution across Cassandra nodes. In simple words(omitting replication feature): different ranges of patients would be stored on different nodes.
  • encounter_start_date column would be a "clustering key" Responsible for data sorting inside partition.

ALLOW FILTERING now can be removed from query:

SELECT encounter_uuid, encounter_start_date 
FROM section_ks.encounters 
WHERE patient_id = '1234' AND encounter_start_date >= '2017-08-19';

Second query:

UPDATE section_ks.encounters SET testproblem_uuid_set = testproblem_uuid_set + {'1256'} WHERE encounter_uuid = 'abcd345';

Table structure should look like close to:

create table section_ks.encounters(
    encounter_uuid text, -- partition key
    patient_id bigint,
    testproblem_uuid_set text, 
    some_other_non_unique_column text,
    PRIMARY KEY (encounter_uuid)
);

If we definitively would like to make a quick filtering only by encounter_uuid, it should be defined as partition key.

Good articles about designing of effective data model:

Elf answered 19/8, 2017 at 17:5 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.