Read & write data into cassandra using apache flin

2020-02-14 05:58发布

I intend to use apache flink for read/write data into cassandra using flink. I was hoping to use flink-connector-cassandra, I don't find good documentation/examples for the connector.

Can you please point me to the right way for read and write data from cassandra using Apache Flink. I see only sink example which are purely for write ? Is apache flink meant for reading data too from cassandra similar to apache spark ?

4条回答
再贱就再见
2楼-- · 2020-02-14 06:03
    ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("localhost").withPort(9042).build();
        }
    };

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));

    DataStreamSource t = env.createInput(inputFormat,  TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
    tableEnv.registerDataStream("t1",t);
    Table t2 = tableEnv.sql("select * from t1");

    t2.printSchema();
查看更多
狗以群分
3楼-- · 2020-02-14 06:03

You can use RichFlatMapFunction to extend a class

class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
  var userCollection: MongoCollection[Document] = _
  override def open(parameters: Configuration): Unit = {
// do something here like opening connection
    val client: MongoClient = MongoClient("mongodb://localhost:10000")

    userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
    super.open(parameters)
  }
  override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {

// Do something here per record and this function can make use of objects initialized via open
      userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
        (result: Document) => {
//          println(result)
                      },
      (t: Throwable) =>{
        println(t)
      },
        ()=>{
          out.collect(event)
        }
      )
    }


  }

}

Basically open function executes once per worker and flatmap executes it per record. The example is for mongo but can be similarly used for cassandra

查看更多
Root(大扎)
4楼-- · 2020-02-14 06:05

In your case as I understand the first step of your pipeline is reading data from Cassandra rather than writing a RichFlatMapFunction you should write your own RichSourceFunction

As a reference you can have a look at simple implementation of WikipediaEditsSource.

查看更多
Summer. ? 凉城
5楼-- · 2020-02-14 06:23

I had the same question, and this is what I was looking for. I don't know if it is over simplified for what you need, but figured I should show it none the less.

ClusterBuilder cb = new ClusterBuilder() {
        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
            return builder.addContactPoint("urlToUse.com").withPort(9042).build();
        }
    };

    CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);

    cassandraInputFormat.configure(null);
    cassandraInputFormat.open(null);

    Tuple2<String, String> testOutputTuple = new Tuple2<>();
    cassandraInputFormat.nextRecord(testOutputTuple);

    System.out.println("column1: " + testOutputTuple.f0);
    System.out.println("column2: " + testOutputTuple.f1);

The way I figured this out was thanks to finding the code for the "CassandraInputFormat" class and seeing how it worked (http://www.javatips.net/api/flink-master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java). I honestly expected it to just be a format and not the full class of reading from Cassandra based on the name, and I have a feeling others might be thinking the same thing.

查看更多
登录 后发表回答