如何在shell中加载Spark Cassandra Connector?

28

我正在尝试在Spark 1.1.0中使用Spark Cassandra Connector

我已经成功地从GitHub的主分支构建了jar文件,并使包含的演示程序正常工作。 但是,当我尝试将jar文件加载到spark-shell中时,无法从com.datastax.spark.connector包中导入任何类。

我尝试使用--jars选项在spark-shell上添加jar文件目录并将其添加到Java的CLASSPATH中,但这两个选项都不起作用。 实际上,当我使用--jars选项时,日志输出显示Datastax jar已加载,但我仍然无法导入来自com.datastax的任何内容。

我已经能够使用--jars将Tuplejump Calliope Cassandra连接器加载到spark-shell中,所以我知道它是有效的。 只是对于我而言,Datastax连接器失败了。


我也是。我使用sbt构建了spark-cassandra-connector。我使用命令$ ./spark-shell --jars ~/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar,并在日志中看到了这个信息INFO spark.SparkContext: Added JAR file:/root/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar at http://xx.xx.xx.xx:60296/jars/spark-cassandra-connector_2.10-1.2.0-SNAPSHOT.jar with timestamp 1414618174823,但仍然无法import com.datastax.spark.connector._。我正在使用Spark 1.1.0。 - Lishu
6个回答

28

我明白了。以下是我的做法:

$ git clone https://github.com/datastax/spark-cassandra-connector.git
$ cd spark-cassandra-connector
$ sbt/sbt assembly
$ $SPARK_HOME/bin/spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/connector-assembly-1.2.0-SNAPSHOT.jar 

在Scala提示符中,

scala> sc.stop
scala> import com.datastax.spark.connector._
scala> import org.apache.spark.SparkContext
scala> import org.apache.spark.SparkContext._
scala> import org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "my cassandra host")
scala> val sc = new SparkContext("spark://spark host:7077", "test", conf)

最终这个组合对我来说是可行的:spark-1.2.1和spark-cassandra-connector-1.2.0-rc2。 - nils petersohn
15/10/23 17:26:18 警告 AppClient$ClientEndpoint:无法连接到主节点 localhost:7077 - Justin Thomas
请参考@chris-batey的答案 - 这是在shell中使用sc.broadcast的唯一方法。如果您停止SparkContext并创建一个新的,如果尝试使用sc.broadcast,则会出现NotSerialisableException - Matti Lyra

18

编辑:现在事情变得更容易了

如需详细说明,请查看项目网站 https://github.com/datastax/spark-cassandra-connector/blob/master/doc/13_spark_shell.md

或随意使用Spark Packages加载库(并非所有版本都已发布) http://spark-packages.org/package/datastax/spark-cassandra-connector

> $SPARK_HOME/bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.10:1.4.0-M3-s_2.10

以下内容假设您正在使用OSS Apache C*

您需要使用 –driver-class-path 启动类,并将其设置为包括所有连接器库

我会引用著名的Amy Tobey的博客文章:

我发现最简单的方法是设置类路径,然后重新启动REPL中的上下文,并导入必要的类使 sc.cassandraTable() 可见。 新加载的方法不会出现在制表符完成中。 我不知道为什么。

  /opt/spark/bin/spark-shell --driver-class-path $(echo /path/to/connector/*.jar |sed 's/ /:/g')
它会打印一大堆日志信息,然后呈现scala>提示符。

它会打印一大堆日志信息,然后呈现scala>提示符。

scala> sc.stop

现在上下文已停止,是时候引入连接器了。

scala> import com.datastax.spark.connector._
scala> val conf = new SparkConf()
scala> conf.set("cassandra.connection.host", "node1.pc.datastax.com")
scala> val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
scala> val table = sc.cassandraTable("keyspace", "table")
scala> table.count

如果您使用的是DSE < 4.5.1

DSE类加载器和以前的包命名约定存在一些小问题,这将阻止您找到新的spark-connector库。您可以通过删除在启动spark-shell脚本中指定DSE类加载器的行来解决此问题。


2
我尝试了这个,但不幸的是,我仍然无法导入 com.datastax 下的任何内容。我的设置肯定有问题,所以我将从头再试一遍。 - egerhard
如果您正在使用Spark 1.1,则需要使用最新的连接器alpha版本才能使其正常工作! - elmalto
2
我被困在 error: object datastax is not a member of package com 这个问题上了。有人能指引一下我吗?我正在使用Spark 1.3。 - optimist
@optimist 因为导入了错误的库而遇到了这个问题。我导入了 spark-assembly 而不是 spark-cassandra-assembly。 - Lyuben Todorov
本地主机方向? - Justin Thomas

6
如果你想避免在shell中停止/启动上下文,你也可以将其添加到以下位置的spark属性中:
{spark_install}/conf/spark-defaults.conf
spark.cassandra.connection.host=192.168.10.10

太棒了 - 这实际上是我发现连接到cassandra的唯一正确方法。以上所有方法都导致sc不可序列化,广播变量在spark-shell中无法工作。 - Matti Lyra
1
当启动shell时,您实际上也可以使用--conf spark.cassandra.connection.host=127.0.0.1提供连接主机。https://spark.apache.org/docs/latest/configuration.html#dynamically-loading-spark-properties - Matti Lyra

5

要从spark-shell访问Cassandra,我已经构建了一个包含所有依赖项的cassandra-spark-driver汇总成的程序集(即“uberjar”)。使用以下方式将其提供给spark-shell:

--jars 选项。

spark-shell --jars spark-cassandra-assembly-1.0.0-SNAPSHOT-jar-with-dependencies.jar

我遇到了与这里描述的相同的问题,这种方法既简单又方便(而不是加载冗长的依赖项列表)。
我创建了一个包含POM文件的gist,您可以下载。使用pom创建uberjar应该这样做:
mvn package

如果您正在使用sbt,请查看sbt-assembly插件。

0
以下步骤描述了如何设置同时具有Spark节点和Cassandra节点的服务器。 设置开源Spark 假设您已经设置好了Cassandra。 步骤1:下载并设置Spark。
Go to http://spark.apache.org/downloads.html.

a) 为了简化事情,我们将使用其中一个预构建的Spark包。选择Spark版本2.0.0和Pre-built for Hadoop 2.7,然后进行直接下载。这将下载一个包含Spark构建二进制文件的存档。

b) 将其提取到您选择的目录中。我会把它放在~/apps/spark-1.2中

c) 通过打开Shell测试Spark是否正常工作

步骤2:测试Spark是否正常工作

a) 进入Spark目录,运行"./bin/spark-shell"。这将打开Spark交互式shell程序

b) 如果一切正常,它应该显示这个提示:"scala>"

运行一个简单的计算:

sc.parallelize(1 to 50).sum(+),这应该输出1250。

c) 恭喜,Spark正在工作!使用命令“exit”退出Spark shell

Spark Cassandra Connector

要将Spark连接到Cassandra集群,需要将Cassandra Connector添加到Spark项目中。DataStax在GitHub上提供了他们自己的Cassandra Connector,我们将使用它。

  1. 克隆Spark Cassandra Connector存储库:

    https://github.com/datastax/spark-cassandra-connector

  2. 进入“spark-cassandra-connector”目录,通过执行以下命令构建Spark Cassandra Connector:

    ./sbt/sbt Dscala-2.11=true assembly

这将在名为“target”的目录中输出已编译的jar文件。将会有两个jar文件,一个用于Scala,另一个用于Java。 我们感兴趣的jar文件是: "spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar",即Scala版本的jar文件。 将该jar文件移动到易于查找的目录中:我将其放在了~/apps/spark-1.2/jars目录下。

将连接器加载到Spark Shell中:

使用以下命令启动Shell:

../bin/spark-shell –jars ~/apps/spark-1.2/jars/spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar

将Spark Context连接到Cassandra集群并停止默认上下文:

sc.stop

导入必要的jar文件:

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf

使用Cassandra连接细节创建新的SparkConf:

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")

创建一个新的Spark上下文:

val sc = new SparkContext(conf)

现在您拥有一个连接到Cassandra集群的新Spark上下文。


0

Spark-Cassandra-Connector完整的JAVA代码,适用于Windows 7、8、10,非常有用。

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import spark_conn.Spark_connection;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;
import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App implements Serializable
{
    private transient SparkConf conf;

    private App(SparkConf conf) {
        this.conf = conf;
    }

    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        generateData(sc);
        compute(sc);
        showResults(sc);
        sc.stop();
    }

    private void generateData(JavaSparkContext sc) {
    CassandraConnector connector =   CassandraConnector.apply(sc.getConf());

        // Prepare the schema
   try{ 
   Session session=connector.openSession();
   session.execute("DROP KEYSPACE IF EXISTS java_api");
   session.execute("CREATE KEYSPACE java_api WITH 
   replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
   session.execute("CREATE TABLE java_api.products 
   (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
   session.execute("CREATE TABLE java_api.sales 
   (id UUID PRIMARY KEY,  product INT, price DECIMAL)");
   session.execute("CREATE TABLE java_api.summaries 
   (product INT PRIMARY KEY, summary DECIMAL)");
  }catch(Exception e){System.out.println(e);}

        // Prepare the products hierarchy
   List<Product> products = Arrays.asList(
   new Product(0, "All products", Collections.<Integer>emptyList()),
                new Product(1, "Product A", Arrays.asList(0)),
                new Product(4, "Product A1", Arrays.asList(0, 1)),
                new Product(5, "Product A2", Arrays.asList(0, 1)),
                new Product(2, "Product B", Arrays.asList(0)),
                new Product(6, "Product B1", Arrays.asList(0, 2)),
                new Product(7, "Product B2", Arrays.asList(0, 2)),
                new Product(3, "Product C", Arrays.asList(0)),
                new Product(8, "Product C1", Arrays.asList(0, 3)),
                new Product(9, "Product C2", Arrays.asList(0, 3))
    );

   JavaRDD<Product> productsRDD = sc.parallelize(products);
   javaFunctions(productsRDD, Product.class).
   saveToCassandra("java_api", "products");

   JavaRDD<Sale> salesRDD = productsRDD.filter
   (new Function<Product, Boolean>() {
            @Override
            public Boolean call(Product product) throws Exception {
                return product.getParents().size() == 2;
            }
        }).flatMap(new FlatMapFunction<Product, Sale>() {
            @Override
            public Iterable<Sale> call(Product product) throws Exception {
                Random random = new Random();
                List<Sale> sales = new ArrayList<>(1000);
                for (int i = 0; i < 1000; i++) {
                  sales.add(new Sale(UUID.randomUUID(), 
                 product.getId(), BigDecimal.valueOf(random.nextDouble())));
                }
                return sales;
            }
        });

      javaFunctions(salesRDD, Sale.class).saveToCassandra
      ("java_api", "sales");
    }

    private void compute(JavaSparkContext sc) {
        JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

        JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
                .cassandraTable("java_api", "sales", Sale.class)
                .keyBy(new Function<Sale, Integer>() {
                    @Override
                    public Integer call(Sale sale) throws Exception {
                        return sale.getProduct();
                    }
                });

        JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);

        JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
            @Override
            public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
                Tuple2<Sale, Product> saleWithProduct = input._2();
                List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
                allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
                for (Integer parentProduct : saleWithProduct._2().getParents()) {
                    allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
                }
                return allSales;
            }
        });

        JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
            @Override
            public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
                return v1.add(v2);
            }
        }).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
            @Override
            public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
                return new Summary(input._1(), input._2());
            }
        });

        javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");
    }

    private void showResults(JavaSparkContext sc) {
        JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
                .cassandraTable("java_api", "summaries", Summary.class)
                .keyBy(new Function<Summary, Integer>() {
                    @Override
                    public Integer call(Summary summary) throws Exception {
                        return summary.getProduct();
                    }
                });

        JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

        List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();

        for (Tuple2<Product, Optional<Summary>> result : results) {
            System.out.println(result);
        }
    }

    public static void main(String[] args) {
//        if (args.length != 2) {
//            System.err.println("Syntax: com.datastax.spark.demo.App <Spark Master URL> <Cassandra contact point>");
//            System.exit(1);
//        }

//      SparkConf conf = new SparkConf(true)
//        .set("spark.cassandra.connection.host", "127.0.1.1")
//        .set("spark.cassandra.auth.username", "cassandra")            
//        .set("spark.cassandra.auth.password", "cassandra");

        //SparkContext sc = new SparkContext("spark://127.0.1.1:9045", "test", conf);

        //return ;

        /* try{
            SparkConf conf = new SparkConf(true); 
            conf.setAppName("Spark-Cassandra Integration");
            conf.setMaster("yarn-cluster");
            conf.set("spark.cassandra.connection.host", "192.168.1.200");
            conf.set("spark.cassandra.connection.rpc.port", "9042");
            conf.set("spark.cassandra.connection.timeout_ms", "40000");
            conf.set("spark.cassandra.read.timeout_ms", "200000");
            System.out.println("Hi.......Main Method1111...");
            conf.set("spark.cassandra.auth.username","cassandra");
            conf.set("spark.cassandra.auth.password","cassandra");
            System.out.println("Connected Successful...!\n");
            App app = new App(conf);
            app.run();
       }catch(Exception e){System.out.println(e);}*/

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
//     conf.setMaster(args[0]);
//        conf.set("spark.cassandra.connection.host", args[1]);
          conf.setMaster("spark://192.168.1.117:7077");
          conf.set("spark.cassandra.connection.host", "192.168.1.200");
          conf.set("spark.cassandra.connection.port", "9042");
          conf.set("spark.ui.port","4040");
          conf.set("spark.cassandra.auth.username","cassandra");
          conf.set("spark.cassandra.auth.password","cassandra");
       App app = new App(conf);
        app.run();
    }

    public static class Product implements Serializable {
        private Integer id;
        private String name;
        private List<Integer> parents;

        public Product() { }

        public Product(Integer id, String name, List<Integer> parents) {
            this.id = id;
            this.name = name;
            this.parents = parents;
        }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getName() { return name; }
        public void setName(String name) { this.name = name; }

        public List<Integer> getParents() { return parents; }
        public void setParents(List<Integer> parents) { this.parents = parents; }

        @Override
        public String toString() {
            return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
        }
    }

    public static class Sale implements Serializable {
        private UUID id;
        private Integer product;
        private BigDecimal price;

        public Sale() { }

        public Sale(UUID id, Integer product, BigDecimal price) {
            this.id = id;
            this.product = product;
            this.price = price;
        }

        public UUID getId() { return id; }
        public void setId(UUID id) { this.id = id; }

        public Integer getProduct() { return product; }
        public void setProduct(Integer product) { this.product = product; }

        public BigDecimal getPrice() { return price; }
        public void setPrice(BigDecimal price) { this.price = price; }

        @Override
        public String toString() {
            return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
        }
    }

    public static class Summary implements Serializable {
        private Integer product;
        private BigDecimal summary;

        public Summary() { }

        public Summary(Integer product, BigDecimal summary) {
            this.product = product;
            this.summary = summary;
        }

        public Integer getProduct() { return product; }
        public void setProduct(Integer product) { this.product = product; }

        public BigDecimal getSummary() { return summary; }
        public void setSummary(BigDecimal summary) { this.summary = summary; }

        @Override
        public String toString() {
            return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接