使用Apache Flink Java API向Cassandra读写数据

5
我打算使用Apache Flink来读写Cassandra中的数据。我希望使用flink-connector-cassandra,但是我没有找到好的连接器文档/示例。
请问您能否指导我正确地使用Apache Flink从Cassandra读写数据?我只看到了纯粹用于写入的sink示例?Apache Flink是否也可以像Apache Spark一样从Cassandra中读取数据?

你看过这份文档和代码示例了吗? - user909481
这个例子只涉及到写入(插入)操作,我也需要读取操作。 - Vinod Jayachandran
链接的文档是关于流式API的,而Flink仅提供了一个sink。对于批处理(DataSet)API,有Cassandra输入/输出格式,您可以潜在地重复使用它们。 - Chesnay Schepler
4个回答

5

我有同样的问题,这就是我一直在寻找的答案。我不知道它是否过于简单了,但是我认为我应该展示它。

ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("urlToUse.com").withPort(9042).build();
        }
    };

    CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);

    cassandraInputFormat.configure(null);
    cassandraInputFormat.open(null);

    Tuple2<String, String> testOutputTuple = new Tuple2<>();
    cassandraInputFormat.nextRecord(testOutputTuple);

    System.out.println("column1: " + testOutputTuple.f0);
    System.out.println("column2: " + testOutputTuple.f1);

我找到了“CassandraInputFormat”类的代码并研究了它的工作方式,这是我找到答案的方法(http://www.javatips.net/api/flink-master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java)。根据名称,我最初认为它只是一种格式而不是从Cassandra读取的完整类,并且我有一种感觉其他人可能也会这样想。

我能否将POJO写入Cassandra,而不必使用Tuple2<String, String>格式,而是返回其自己的事件类型? - Amarjit Dhillon
是的,但需要定制代码。我对Cassandra输出格式化程序进行了版本更新,使用POJO代替元组,并对CassandraOutputFormat代码进行了少量修改。 - Jicaar

2
    ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("localhost").withPort(9042).build();
        }
    };

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));

    DataStreamSource t = env.createInput(inputFormat,  TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
    tableEnv.registerDataStream("t1",t);
    Table t2 = tableEnv.sql("select * from t1");

    t2.printSchema();

0

根据我的理解,你的管道的第一步是从Cassandra读取数据而不是写入RichFlatMapFunction,因此你应该编写自己的RichSourceFunction

作为参考,你可以查看WikipediaEditsSource的简单实现。


0
您可以使用RichFlatMapFunction来扩展一个类。
class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
  var userCollection: MongoCollection[Document] = _
  override def open(parameters: Configuration): Unit = {
// do something here like opening connection
    val client: MongoClient = MongoClient("mongodb://localhost:10000")

    userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
    super.open(parameters)
  }
  override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {

// Do something here per record and this function can make use of objects initialized via open
      userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
        (result: Document) => {
//          println(result)
                      },
      (t: Throwable) =>{
        println(t)
      },
        ()=>{
          out.collect(event)
        }
      )
    }


  }

}

基本上,open 函数每个工作进程只执行一次,而 flatmap 每条记录都会执行一次。这个例子是针对 MongoDB 的,但同样适用于 Cassandra。


谢谢 @Gaurav,你能给我提供一个 Java 的类似示例吗? - Vinod Jayachandran
1
http://stackoverflow.com/questions/34224423/apache-flink-executing-a-program-which-extends-the-richflatmapfunction-on-the-r - Gaurav Shah
谢谢 @Gaurav - Vinod Jayachandran
如果这对您有所帮助,您能否给它点个赞或接受它作为答案? - Gaurav Shah

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接