上表数据加入数据流和数据流接收更新的表,这可能吗?(Joining streaming data o

2019-10-29 06:04发布

我使用的火花SQL 2.4.1,火花卡桑德拉 - connector_2.11-2.4.1.jar和java8。 我的情况,我需要用C * / Cassandra的表格数据加入数据流。

如果记录/联接发现我需要将现有的C *表的记录复制到另一个table_bkp和更新最新数据,实际的C *表记录。

由于数据流进来我需要执行此操作。 难道这可以用火花SQL蒸做些什么呢? 如果是这样,怎么办呢? 任何警告照顾?

对于每批如何获得C *表中的数据新鲜?

什么是错的,我在这里做什么

我有两个表如下“master_table”和“backup_table”

table kspace.master_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );

table kspace.backup_table(
    statement_id int,
    statement_flag text,
    statement_date date,
    x_val double,
    y_val double,
    z_val double,
    backup_timestamp timestamp,
    PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC,   backup_timestamp DESC);


Each streaming record would have "statement_flag" which might be "I" or "U".
If record with "I" comes we directly insert into "master_table".
If record with "U" comes , need to check if there is any record for given ( statement_id ), statement_date in "master_table".
     If there is record in "master_table" copy that one to "backup_table" with current timestamp i.e. backup_timestamp
     Update the record in "master_table" with latest record.

为了实现我做的PoC /代码像下面的上方

Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));

String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";


Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);

writeDfToCassandra( baseDs.toDF(), keyspace, master_table);


u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");

Dataset<Row> joinUpdatedRecordsDs =  sparkSession.sql(
            " select p.statement_id, p.statement_flag, p.statement_date,"
            + "p.x_val,p.y_val,p.z_val "
            + " from persisted_records as p "
            + "join u_records as u "
            + "on p.statement_id = u.statement_id  and p.statement_date = u.statement_date");



Dataset<Row> updated_records =   joinUpdatedRecordsDs
                            .withColumn("backup_timestamp",current_timestamp());

updated_records.show(); //Showing correct results 


writeDfToCassandra( updated_records.toDF(), keyspace, backup_table);  // But here/backup_table copying the latest "master_table" records 

样本数据

对于“我”标志的第一个记录

master_table

backup_table

对于具有“U”标志,即,相同的如前面除“y_val”列中的数据的第二记录

master_table

backup_table

预期

但实际的表数据

题:

直到显示数据帧(updated_records)示出正确的数据。 但是,当我插入相同的数据框(updated_records)到表中,C * backup_table数据显示为master_table的最新记录完全相同,但假设有master_table的较早记录。

  updated_records.show(); //Showing correct results 


    writeDfToCassandra( updated_records.toDF(), keyspace, backup_table);  // But here/backup_table copying the latest "master_table" records 

所以我在做什么错在上面的程序代码?

Answer 1:

有几种方法来与这取决于你需要多少数据,以检查各个级别的性能做到这一点。

例如,如果你只分区键查找数据的最有效的事情就是用joinWithCassandraTable在DSTREAM。 对于每批这将提取匹配传入的分区键的记录。 在结构化的数据流,这将与正确书写SQL加盟,DSE自动发生。 如果DSE在使用中并没有将全力扫描每批表。

相反,如果你需要整个表中的每个批次,有CassandraRDD加入DSTREAM批次将导致RDD被完全在每个批次重新读取。 这是更昂贵的,如果整个表没有被改写。

如果您只更新记录,不检查其先前的值,它足以只需直接写输入数据到C *表。 C *使用upserts和上次写双赢的行为,如果他们存在只会覆盖以前的值。



文章来源: Joining streaming data on table data and update the table as the stream receives , is it possible?