Read & write data into cassandra using apache flin

2020-02-14 05:58发布

I intend to use apache flink for read/write data into cassandra using flink. I was hoping to use flink-connector-cassandra, I don't find good documentation/examples for the connector.

Can you please point me to the right way for read and write data from cassandra using Apache Flink. I see only sink example which are purely for write ? Is apache flink meant for reading data too from cassandra similar to apache spark ?

2楼-- · 2020-02-14 06:03
    ClusterBuilder cb = new ClusterBuilder() {
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("localhost").withPort(9042).build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));

    DataStreamSource t = env.createInput(inputFormat,  TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
    Table t2 = tableEnv.sql("select * from t1");

3楼-- · 2020-02-14 06:03

You can use RichFlatMapFunction to extend a class

class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
  var userCollection: MongoCollection[Document] = _
  override def open(parameters: Configuration): Unit = {
// do something here like opening connection
    val client: MongoClient = MongoClient("mongodb://localhost:10000")

    userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
  override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {

// Do something here per record and this function can make use of objects initialized via open
      userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
        (result: Document) => {
//          println(result)
      (t: Throwable) =>{



Basically open function executes once per worker and flatmap executes it per record. The example is for mongo but can be similarly used for cassandra

4楼-- · 2020-02-14 06:05

In your case as I understand the first step of your pipeline is reading data from Cassandra rather than writing a RichFlatMapFunction you should write your own RichSourceFunction

As a reference you can have a look at simple implementation of WikipediaEditsSource.

Summer. ? 凉城
5楼-- · 2020-02-14 06:23

I had the same question, and this is what I was looking for. I don't know if it is over simplified for what you need, but figured I should show it none the less.

ClusterBuilder cb = new ClusterBuilder() {
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("").withPort(9042).build();

    CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);


    Tuple2<String, String> testOutputTuple = new Tuple2<>();

    System.out.println("column1: " + testOutputTuple.f0);
    System.out.println("column2: " + testOutputTuple.f1);

The way I figured this out was thanks to finding the code for the "CassandraInputFormat" class and seeing how it worked ( I honestly expected it to just be a format and not the full class of reading from Cassandra based on the name, and I have a feeling others might be thinking the same thing.

登录 后发表回答