Delete from cassandra Table in Spark

2020-07-18 04:48发布

问题:

I'm using Spark with cassandra. And i'm reading some rows from my table in order to delete theme using the PrimaryKey. This is my code :

val lines = sc.cassandraTable[(String, String, String, String)](CASSANDRA_SCHEMA, table).
  select("a","b","c","d").
  where("d=?", d).cache()

lines.foreach(r => {
    val session: Session = connector.openSession
    val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where channel='"+r._1 +"' and ctid='"+r._2+"'and cvid='"+r._3+"';"
    session.execute(delete)
    session.close()
})

But this method create an session for each row and it takes lot of time. So is it possible to delete my rows using sc.CassandraTable or another solution better then the mine.

Thank you

回答1:

I don't think there's a support for delete at the moment on the Cassandra Connector. To amortize the cost of connection setup, the recommended approach is to apply the operation to each partition.

So your code will look like this:

lines.foreachPartition(partition => {
    val session: Session = connector.openSession //once per partition
    partition.foreach{elem => 
        val delete = s"DELETE FROM "+CASSANDRA_SCHEMA+"."+table+" where     channel='"+elem._1 +"' and ctid='"+elem._2+"'and cvid='"+elem._3+"';"
        session.execute(delete)
    }
    session.close()
})

You could also look into using the DELETE FROM ... WHERE pk IN (list) and use a similar approach to build up the list for each partition. This will be even more performant, but might break with very large partitions as the list will become consequentially long. Repartitioning your target RDD before applying this function will help.



回答2:

You asked the question a long time ago so you probably found the answer already. :P Just to share, here is what I did in Java. This code works against my local Cassandra instance beautifully. But it does not work against our BETA or PRODUCTION instances, because I suspect there are multiple instances of the Cassandra database there and the delete only worked against 1 instance and the data got replicated right back. :(

Please do share if you were able to get it to work against your Cassandra production environment, with multiple instances of it running!

public static void deleteFromCassandraTable(Dataset myData, SparkConf sparkConf){
    CassandraConnector connector = CassandraConnector.apply(sparkConf);
    myData.foreachPartition(partition -> {
        Session session = connector.openSession();

        while(partition.hasNext()) {
            Row row = (Row) partition.next();
            boolean isTested = (boolean) row.get(0);
            String product = (String) row.get(1);
            long reportDateInMillSeconds = ((Timestamp) row.get(2)).getTime();
            String id = (String) row.get(3);

            String deleteMyData = "DELETE FROM test.my_table"
                    + " WHERE is_tested=" + isTested
                    + " AND product='" + product + "'"
                    + " AND report_date=" + reportDateInMillSeconds
                    + " AND id=" + id + ";";

            System.out.println("%%% " + deleteMyData);
            ResultSet deleteResult = session.execute(deleteMyData);
            boolean result = deleteResult.wasApplied();
            System.out.println("%%% deleteResult =" + result);
        }
        session.close();
    });
}