Spark-Cassandra Connector : Failed to open native connection to Cassandra
Asked Answered
F

5

5

I am new to Spark and Cassandra. On trying to submit a spark job, I am getting an error while connecting to Cassandra.

Details:

Versions:

Spark : 1.3.1 (build for hadoop 2.6 or later : spark-1.3.1-bin-hadoop2.6)
Cassandra : 2.0
Spark-Cassandra-Connector: 1.3.0-M1
scala : 2.10.5

Spark and Cassandra is on a virtual cluster Cluster details:

Spark Master : 192.168.101.13
Spark Slaves : 192.168.101.11 and 192.168.101.12
Cassandra Nodes: 192.168.101.11 (seed node) and 192.168.101.12

I am trying to submit a job through my client machine (laptop) - 172.16.0.6. After googling for this error, I have made sure that I can ping all the machines on the cluster from the client machine : spark master/slaves and cassandra nodes and also disabled the firewall on all machines. But I am still struggling with this error.

Cassandra.yaml

listen_address: 192.168.101.11 (192.168.101.12 on other cassandra node)
start_native_transport: true
native_transport_port: 9042
start_rpc: true
rpc_address: 192.168.101.11 (192.168.101.12 on other cassandra node)
rpc_port: 9160

I am trying to run a minimal sample job

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import com.datastax.spark.connector._

val rdd = sc.cassandraTable("test", "words")
rdd.toArray.foreach(println)

To submit the job, I use spark-shell (:paste the code in spark shell):

    spark-shell --jars "/home/ameya/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.3.0-M1/spark-cassandra-connector_2.10-1.3.0-M1.jar","/home/ameya/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.5/cassandra-driver-core-2.1.5.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar","/home/ameya/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.5/cassandra-clientutil-2.1.5.jar","/home/ameya/.m2/repository/joda-time/joda-time/2.3/joda-time-2.3.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.3/cassandra-thrift-2.1.3.jar","/home/ameya/.m2/repository/org/joda/joda-convert/1.2/joda-convert-1.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar" --master spark://192.168.101.13:7077 --conf spark.cassandra.connection.host=192.168.101.11 --conf spark.cassandra.auth.username=cassandra --conf spark.cassandra.auth.password=cassandra

The error I am getting:

warning: there were 1 deprecation warning(s); re-run with -deprecation for details
**java.io.IOException: Failed to open native connection to Cassandra at {192.168.101.11}:9042**
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:181)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:167)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:167)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:76)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:104)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:115)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:243)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:49)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:148)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:118)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813)
    at org.apache.spark.rdd.RDD.toArray(RDD.scala:833)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
    at $iwC$$iwC$$iwC.<init>(<console>:54)
    at $iwC$$iwC.<init>(<console>:56)
    at $iwC.<init>(<console>:58)
    at <init>(<console>:60)
    at .<init>(<console>:64)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
**Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.101.11:9042 (com.datastax.driver.core.TransportException: [/192.168.101.11:9042] Connection has been closed))**
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1236)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:333)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174)
    ... 71 more

Can anyone point out what am I doing wrong here ?

Furore answered 18/6, 2015 at 23:37 Comment(0)
D
7

you did not specified spark.cassandra.connection.host by default spark assume that cassandra host is same as spark master node.

var sc:SparkContext=_
val conf = new SparkConf().setAppName("Cassandra Demo").setMaster(master)
.set("spark.cassandra.connection.host", "192.168.101.11")
c=new SparkContext(conf)

val rdd = sc.cassandraTable("test", "words")
rdd.toArray.foreach(println)

it should work if you have properly set seed nodein cassandra.yaml

Doelling answered 23/6, 2015 at 6:40 Comment(1)
My bad i forgot to mention that configuration. I had done that. However, the issue is resolved. It was some dependency issue. When i supplied the fat jar with dependencies, it worked out.Furore
L
6

I struggled with this issue overnight, and finally got a combination that works. I am writing it down for those who may run into similar issue.

First of all, this is a version issue cassandra-driver-core's dependency. But to track down the exact combination that works takes me quite a bit time.

Secondly, this is the combination that works for me.

  1. Spark 1.6.2 with Hadoop 2.6, cassandra 2.1.5 (Ubuntu 14.04, Java 1.8),
  2. In built.sbt (sbt assembly, scalaVersion := "2.10.5"), use

"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0", "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5"

Thirdly, let me clarify my frustrations. With spark-cassandra-connector 1.5.0, I can run the assembly with spark-submit with --master "local[2]" on the same machine with remote cassandra connection without any problem. Any combination of connector 1.5.0, 1.6.0 with Cassandra 2.0, 2.1, 2.2, 3,4 works well. But if I try to submit the job to a cluster from the same machine (NodeManager) with --master yarn --deploy-mode cluster, then I will always run into the problem: Failed to open native connection to Cassandra at {192.168.122.12}:9042

What is going on here? Any from DataStarX can take a look at this issue? I can only guess it has something to do with "cqlversion", which should match the version of Cassandra cluster.

Anybody know a better solution? [cassandra], [apache-spark]

Larissa answered 25/7, 2016 at 19:32 Comment(0)
U
4

It's worked finally :

steps :

  1. set listen_address to private IP of EC2 instance.
  2. do not set any broadcast_address
  3. set rpc_address to 0.0.0.0
  4. set broadcast_rpc_address to public ip of EC2 instance.
Ummersen answered 22/10, 2016 at 6:57 Comment(0)
F
2

The issue resolved. It was due to some mess up with the dependencies. I built a jar with dependencies and passed it to spark-submit, instead of specifying dependent jars separately.

Furore answered 26/6, 2015 at 17:33 Comment(2)
I'm having the same issue. Can you share the name of the jar you used to make it work? Or how you built it with the dependencies?Commingle
I'm having the same issue as well. Would you please describe which jars were necessary?Bucktooth
T
0

This is an issue with version of the cassandra-driver-core jar's dependency.

The provided cassandra's version is 2.0
The provided cassandra-driver-core jar's version is 2.1.5

The jar should be the same as the version of the cassandra running.

In this case, the included jar file should be cassandra-driver-core-2.0.0.jar
Taggart answered 20/6, 2016 at 11:3 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.