How to load Spark Cassandra Connector in the shell?
Asked Answered
S

6

28

I am trying to use Spark Cassandra Connector in Spark 1.1.0.

I have successfully built the jar file from the master branch on GitHub and have gotten the included demos to work. However, when I try to load the jar files into the spark-shell I can't import any of the classes from the com.datastax.spark.connector package.

I have tried using the --jars option on spark-shell and adding the directory with the jar file to Java's CLASSPATH. Neither of these options work. In fact, when I use the --jars option, the logging output shows that the Datastax jar is getting loaded, but I still cannot import anything from com.datastax.

I have been able to load the Tuplejump Calliope Cassandra connector into the spark-shell using --jars, so I know that's working. It's just the Datastax connector which is failing for me.

Snakeroot answered 14/9, 2014 at 19:57 Comment(1)
Same here. I built spark-cassandra-connector with sbt. I use command $ ./spark-shell --jars ~/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar and see this in log INFO spark.SparkContext: Added JAR file:/root/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar at http://xx.xx.xx.xx:60296/jars/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar with timestamp 1414618174823, but still cannot import com.datastax.spark.connector._. I'm using Spark 1.1.0.Devoted
D
28

I got it. Below is what I did:

$ git clone https://github.com/datastax/spark-cassandra-connector.git
$ cd spark-cassandra-connector
$ sbt/sbt assembly
$ $SPARK_HOME/bin/spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/connector-assembly-1.2.0-SNAPSHOT.jar 

In scala prompt,

scala> sc.stop
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> import org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "my cassandra host")
scala> val sc = new SparkContext("spark://spark host:7077", "test", conf)
Devoted answered 30/10, 2014 at 16:46 Comment(3)
finally this combination was working for me: spark-1.2.1 and spark-cassandra-connector-1.2.0-rc2Pooka
15/10/23 17:26:18 WARN AppClient$ClientEndpoint: Failed to connect to master localhost:7077 aUnwept
please see @chris-batey answer below - it's the only way to use sc.broadcast in the shell. If you stop the SparkContext and create a new one, you will get a NotSerialisableException if you try to use sc.broadcast.Mistaken
W
18

Edit: Things are a bit easier now

For in-depth instructions check out the project website https://github.com/datastax/spark-cassandra-connector/blob/master/doc/13_spark_shell.md

Or feel free to use Spark-Packages to load the Library (Not all versions published) http://spark-packages.org/package/datastax/spark-cassandra-connector

> $SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M3-s_2.10

The following assumes you are running with OSS Apache C*

You'll want to start the class with the –driver-class-path set to include all your connector libs

I'll quote a blog post from the illustrious Amy Tobey

The easiest way I’ve found is to set the classpath with then restart the context in the REPL with the necessary classes imported to make sc.cassandraTable() visible. The newly loaded methods will not show up in tab completion. I don’t know why.

  /opt/spark/bin/spark-shell --driver-class-path $(echo /path/to/connector/*.jar |sed 's/ /:/g')

It will print a bunch of log information then present scala> prompt.

scala> sc.stop

Now that the context is stopped, it’s time to import the connector.

scala> import com.datastax.spark.connector._
scala> val conf = new SparkConf()
scala> conf.set("cassandra.connection.host", "node1.pc.datastax.com")
scala> val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
scala> val table = sc.cassandraTable("keyspace", "table")
scala> table.count

If you are running with DSE < 4.5.1

There is a slight issue with the DSE Classloader and previous package naming conventions that will prevent you from finding the new spark-connector libraries. You should be able to get around this by removing the line specifying the DSE Class loader in the scripts starting spark-shell.

Winfield answered 18/9, 2014 at 17:46 Comment(5)
I tried this, but, unfortunately, I still can't import anything under com.datastax. It definitely looks like there's something wrong with my setup, so I will try again from the beginning.Snakeroot
If you're using Spark 1.1 you need to use the latest alpha of the connector to make it work!Burglarious
I'm stuck at error: object datastax is not a member of package com Can someone please point me the direction? I'm using spark 1.3Squamosal
@Squamosal Ran to this problem due to importing the wrong library. I imported spark-assembly rather than spark-cassandra-assembly.Solubility
localhost directions?Unwept
A
6

If you want to avoid stoppping/starting the context in the shell you can also add it into your spark properties in:

{spark_install}/conf/spark-defaults.conf

spark.cassandra.connection.host=192.168.10.10
Antiparallel answered 21/1, 2015 at 13:46 Comment(2)
bravo - this is actually the only correct way I've found of connecting to cassandra. All of the above result in the sc not being serialisable and broadcast variables not working in the spark-shell.Mistaken
You can actually also just provide the connection host using --conf spark.cassandra.connection.host=127.0.0.1 when launching the shell spark.apache.org/docs/latest/…Mistaken
B
5

To access Cassandra from the spark-shell, I've built an assembly out of the cassandra-spark-driver with all dependencies (an "uberjar"). Providing it to the spark-shell using the --jars option like this:

spark-shell --jars spark-cassandra-assembly-1.0.0-SNAPSHOT-jar-with-dependencies.jar

I was facing the same issue described here and this method is both simple and convenient (instead of loading the long list of dependencies)

I've created a gist with the POM file that you can download. Using the pom to create the uberjar you should do:

mvn package

If you're using sbt, look into the sbt-assembly plugin.

Boddie answered 30/9, 2014 at 16:54 Comment(0)
P
0

The following steps describe how to setup a server with both a Spark Node and a Cassandra Node.

Setting Up Open Source Spark

This assumes you already have Cassandra setup.

Step 1: Download and setup Spark

Go to http://spark.apache.org/downloads.html.

a) To make things simple, we will use one of the prebuilt Spark packages. Choose Spark version 2.0.0 and Pre-built for Hadoop 2.7 then Direct Download. This will download an archive with the built binaries for Spark.

b) Extract this to a directory of your choosing. I will put mine in ~/apps/spark-1.2

c) Test Spark is working by opening the Shell

Step 2: Test that Spark Works

a) cd into the Spark directory Run "./bin/spark-shell". This will open up the Spark interactive shell program

b) If everything worked it should display this prompt: "scala>"

Run a simple calculation:

sc.parallelize( 1 to 50 ).sum(+) which should output 1250.

c) Congratulations Spark is working! Exit the Spark shell with the command "exit"

The Spark Cassandra Connector

To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project. DataStax provides their own Cassandra Connector on GitHub and we will use that.

  1. Clone the Spark Cassandra Connector repository:

    https://github.com/datastax/spark-cassandra-connector

  2. cd into "spark-cassandra-connector" Build the Spark Cassandra Connector by executing the command

    ./sbt/sbt Dscala-2.11=true assembly

This should output compiled jar files to the directory named "target". There will be two jar files, one for Scala and one for Java. The jar we are interested in is: "spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar" the one for Scala. Move the jar file into an easy to find directory: I put mine into ~/apps/spark-1.2/jars

To load the connector into the Spark Shell:

start the shell with this command:

../bin/spark-shell –jars ~/apps/spark-1.2/jars/spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar

Connect the Spark Context to the Cassandra cluster and stop the default context:

sc.stop

Import the necessary jar files:

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf

Make a new SparkConf with the Cassandra connection details:

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")

Create a new Spark Context:

val sc = new SparkContext(conf)

You now have a new SparkContext which is connected to your Cassandra cluster.

Pacha answered 9/10, 2016 at 10:0 Comment(0)
P
0

Spark-Cassandra-Connector Complete Code in JAVA with Window-7,8,10 Usefull.

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import spark_conn.Spark_connection;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;
import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App implements Serializable
{
    private transient SparkConf conf;

    private App(SparkConf conf) {
        this.conf = conf;
    }

    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        generateData(sc);
        compute(sc);
        showResults(sc);
        sc.stop();
    }

    private void generateData(JavaSparkContext sc) {
    CassandraConnector connector =   CassandraConnector.apply(sc.getConf());

        // Prepare the schema
   try{ 
   Session session=connector.openSession();
   session.execute("DROP KEYSPACE IF EXISTS java_api");
   session.execute("CREATE KEYSPACE java_api WITH 
   replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
   session.execute("CREATE TABLE java_api.products 
   (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
   session.execute("CREATE TABLE java_api.sales 
   (id UUID PRIMARY KEY,  product INT, price DECIMAL)");
   session.execute("CREATE TABLE java_api.summaries 
   (product INT PRIMARY KEY, summary DECIMAL)");
  }catch(Exception e){System.out.println(e);}

        // Prepare the products hierarchy
   List<Product> products = Arrays.asList(
   new Product(0, "All products", Collections.<Integer>emptyList()),
                new Product(1, "Product A", Arrays.asList(0)),
                new Product(4, "Product A1", Arrays.asList(0, 1)),
                new Product(5, "Product A2", Arrays.asList(0, 1)),
                new Product(2, "Product B", Arrays.asList(0)),
                new Product(6, "Product B1", Arrays.asList(0, 2)),
                new Product(7, "Product B2", Arrays.asList(0, 2)),
                new Product(3, "Product C", Arrays.asList(0)),
                new Product(8, "Product C1", Arrays.asList(0, 3)),
                new Product(9, "Product C2", Arrays.asList(0, 3))
    );

   JavaRDD<Product> productsRDD = sc.parallelize(products);
   javaFunctions(productsRDD, Product.class).
   saveToCassandra("java_api", "products");

   JavaRDD<Sale> salesRDD = productsRDD.filter
   (new Function<Product, Boolean>() {
            @Override
            public Boolean call(Product product) throws Exception {
                return product.getParents().size() == 2;
            }
        }).flatMap(new FlatMapFunction<Product, Sale>() {
            @Override
            public Iterable<Sale> call(Product product) throws Exception {
                Random random = new Random();
                List<Sale> sales = new ArrayList<>(1000);
                for (int i = 0; i < 1000; i++) {
                  sales.add(new Sale(UUID.randomUUID(), 
                 product.getId(), BigDecimal.valueOf(random.nextDouble())));
                }
                return sales;
            }
        });

      javaFunctions(salesRDD, Sale.class).saveToCassandra
      ("java_api", "sales");
    }

    private void compute(JavaSparkContext sc) {
        JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

        JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
                .cassandraTable("java_api", "sales", Sale.class)
                .keyBy(new Function<Sale, Integer>() {
                    @Override
                    public Integer call(Sale sale) throws Exception {
                        return sale.getProduct();
                    }
                });

        JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);

        JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
            @Override
            public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
                Tuple2<Sale, Product> saleWithProduct = input._2();
                List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
                allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
                for (Integer parentProduct : saleWithProduct._2().getParents()) {
                    allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
                }
                return allSales;
            }
        });

        JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
            @Override
            public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
                return v1.add(v2);
            }
        }).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
            @Override
            public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
                return new Summary(input._1(), input._2());
            }
        });

        javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");
    }

    private void showResults(JavaSparkContext sc) {
        JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
                .cassandraTable("java_api", "summaries", Summary.class)
                .keyBy(new Function<Summary, Integer>() {
                    @Override
                    public Integer call(Summary summary) throws Exception {
                        return summary.getProduct();
                    }
                });

        JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

        List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();

        for (Tuple2<Product, Optional<Summary>> result : results) {
            System.out.println(result);
        }
    }

    public static void main(String[] args) {
//        if (args.length != 2) {
//            System.err.println("Syntax: com.datastax.spark.demo.App <Spark Master URL> <Cassandra contact point>");
//            System.exit(1);
//        }

//      SparkConf conf = new SparkConf(true)
//        .set("spark.cassandra.connection.host", "127.0.1.1")
//        .set("spark.cassandra.auth.username", "cassandra")            
//        .set("spark.cassandra.auth.password", "cassandra");

        //SparkContext sc = new SparkContext("spark://127.0.1.1:9045", "test", conf);

        //return ;

        /* try{
            SparkConf conf = new SparkConf(true); 
            conf.setAppName("Spark-Cassandra Integration");
            conf.setMaster("yarn-cluster");
            conf.set("spark.cassandra.connection.host", "192.168.1.200");
            conf.set("spark.cassandra.connection.rpc.port", "9042");
            conf.set("spark.cassandra.connection.timeout_ms", "40000");
            conf.set("spark.cassandra.read.timeout_ms", "200000");
            System.out.println("Hi.......Main Method1111...");
            conf.set("spark.cassandra.auth.username","cassandra");
            conf.set("spark.cassandra.auth.password","cassandra");
            System.out.println("Connected Successful...!\n");
            App app = new App(conf);
            app.run();
       }catch(Exception e){System.out.println(e);}*/

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
//     conf.setMaster(args[0]);
//        conf.set("spark.cassandra.connection.host", args[1]);
          conf.setMaster("spark://192.168.1.117:7077");
          conf.set("spark.cassandra.connection.host", "192.168.1.200");
          conf.set("spark.cassandra.connection.port", "9042");
          conf.set("spark.ui.port","4040");
          conf.set("spark.cassandra.auth.username","cassandra");
          conf.set("spark.cassandra.auth.password","cassandra");
       App app = new App(conf);
        app.run();
    }

    public static class Product implements Serializable {
        private Integer id;
        private String name;
        private List<Integer> parents;

        public Product() { }

        public Product(Integer id, String name, List<Integer> parents) {
            this.id = id;
            this.name = name;
            this.parents = parents;
        }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getName() { return name; }
        public void setName(String name) { this.name = name; }

        public List<Integer> getParents() { return parents; }
        public void setParents(List<Integer> parents) { this.parents = parents; }

        @Override
        public String toString() {
            return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
        }
    }

    public static class Sale implements Serializable {
        private UUID id;
        private Integer product;
        private BigDecimal price;

        public Sale() { }

        public Sale(UUID id, Integer product, BigDecimal price) {
            this.id = id;
            this.product = product;
            this.price = price;
        }

        public UUID getId() { return id; }
        public void setId(UUID id) { this.id = id; }

        public Integer getProduct() { return product; }
        public void setProduct(Integer product) { this.product = product; }

        public BigDecimal getPrice() { return price; }
        public void setPrice(BigDecimal price) { this.price = price; }

        @Override
        public String toString() {
            return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
        }
    }

    public static class Summary implements Serializable {
        private Integer product;
        private BigDecimal summary;

        public Summary() { }

        public Summary(Integer product, BigDecimal summary) {
            this.product = product;
            this.summary = summary;
        }

        public Integer getProduct() { return product; }
        public void setProduct(Integer product) { this.product = product; }

        public BigDecimal getSummary() { return summary; }
        public void setSummary(BigDecimal summary) { this.summary = summary; }

        @Override
        public String toString() {
            return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
        }
    }
}
Provincial answered 22/12, 2016 at 9:43 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.