using prepared statement multiple times, giving a

2020-01-29 22:10发布

问题:

I am getting data from somewhere and inserting it into cassandra daily basis then I need to retrieve the data from cassandra for whole week and do some processing and insert result back onto cassandra.

i have lot of records, each record executing most of the below operations.

To do this I have written a program below its working fine but I get warning and according to API document should not use prepare statement multiple time its reducing performance.

Please tell me how to avoid this to improve the performance OR suggest me any alternative approach to achieve this in scala.

Here is some part of my code:

object CassandraUtils {
  println("##########entered cassandrutils")

  val selectQuery = "select * from k1.table1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?;"
  val selectTripQuery = "select * from k1.tale1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt = ? and t_summ_id = ?;"

  val insertQuery = "insert into k1.table1 (s_id, a_id, summ_typ, summ_dt, t_summ_id, a_s_no, avg_sp, c_dist, c_epa, c_gal, c_mil, d_id, d_s_no, dist, en_dt, en_lat, en_long, epa, gal, h_dist, h_epa,h_gal, h_mil, id_tm, max_sp, mil, rec_crt_dt, st_lat, st_long, tr_dis, tr_dt, tr_dur,st_addr,en_addr) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?);"
  val updateQuery = "update k1.table1 set tr_dur=?,id_tm=?,max_sp=?,c_dist=?,h_dist=?,dist=?,c_gal=?,c_mil=?,h_gal=?,h_mil=?,c_epa=?,h_epa=?,epa=?,gal=?,rec_crt_dt=?,mil=?,avg_sp=?,tr_dis=?,en_lat=?,en_long=? where s_id= ? and a_id= ? and summ_typ= ? and  summ_dt= ? and t_summ_id=?; "

  val dashboardSelectQuery: String = "select * from k1.table2 where s_id = ? and a_id = ? and hlth_typ= ? and hlth_s_typ= ?;"
  val insertDashBoardQuery = "insert into k1.table2 (s_id, a_id, hlth_typ, hlth_s_typ, dsh_nval_01, rec_crt_dt, lst_rfr_dt, a_s_no) values (? ,?, ?, ?, ?, ?, ?, ?);"
  val updateDashBoardQuery = "update k1.table2 set dsh_nval_01= ?, lst_rfr_dt= ? where s_id= ? and a_id= ? and hlth_typ= ? and hlth_s_typ= ?;"

  val dInfoSelectQuery = "select d_s_no,d_type,a_id,d_id,s_id from k2.table3 where d_s_no = ?"

  def insert(session: Session, data: THData, batch: BatchStatement) {
    val insertStatement = session.prepare(insertQuery)
    //insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
    //println("data.st_addr,data.en_addr: ------------------->>>>>>  " + data.st_addr, data.en_addr)
    val boundStatement = new BoundStatement(insertStatement)
    //session.execute(boundStatement.bind( data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id, data.a_s_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.d_id, data.d_s_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa,data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur,data.st_addr,data.en_addr))
    batch.add(boundStatement.bind(data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id, data.a_s_no, data.avg_sp, data.c_dist, data.c_epa, data.c_gal, data.c_mil, data.d_id, data.d_s_no, data.dist, data.en_dt, data.en_lat, data.en_long, data.epa, data.gal, data.h_dist, data.h_epa, data.h_gal, data.h_mil, data.id_tm, data.max_sp, data.mil, data.rec_crt_dt, data.st_lat, data.st_long, data.tr_dis, data.tr_dt, data.tr_dur, data.st_addr, data.en_addr))
  }

  def update(session: Session, data: THData, batch: BatchStatement) {
    val updateStatement = session.prepare(updateQuery)
    //insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
    val boundStatement = new BoundStatement(updateStatement)
    //session.execute(boundStatement.bind( data.tr_dur, data.id_tm, data.max_sp, data.c_dist, data.h_dist, data.dist, data.c_gal, data.c_mil, data.h_gal, data.h_mil, data.c_epa, data.h_epa, data.epa, data.gal, data.rec_crt_dt, data.mil, data.avg_sp, data.tr_dis,data.en_lat, data.en_long, data.s_id,data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id ))
    batch.add(boundStatement.bind(data.tr_dur, data.id_tm, data.max_sp, data.c_dist, data.h_dist, data.dist, data.c_gal, data.c_mil, data.h_gal, data.h_mil, data.c_epa, data.h_epa, data.epa, data.gal, data.rec_crt_dt, data.mil, data.avg_sp, data.tr_dis, data.en_lat, data.en_long, data.s_id, data.a_id, data.summ_typ, data.summ_dt, data.t_summ_id))
  }

  def getQueryData(session: Session, im: String): (Long, String, String, String) = {
    //println("query---->>>> :" + dInfoSelectQuery)
    val selectStatement = session.prepare(dInfoSelectQuery)
    val boundStatement = new BoundStatement(selectStatement)
    val result: ResultSet = session.execute(boundStatement.bind(im))
    val row = result.one()
    (row.getLong("s_id"), row.getString("a_id"), row.getString("d_id"), row.getString("d_s_no"))
  }

  def getDashBoardData(session: Session, Data: THData): AssetDashboardData = {
    //println("query---->>>> :" + dashboardSelectQuery)
    val selectStatement = session.prepare(dashboardSelectQuery)
    val boundStatement = new BoundStatement(selectStatement)
    val result: ResultSet = session.execute(boundStatement.bind(Data.s_id, Data.a_id, "odometer", "calculated"))
    var assetDashboardData: AssetDashboardData = null
    val row = result.one()
    if (row != null) {
     //doing some processing
    }
    assetDashboardData
  }

  def dashBoardInsert(session: Session, data: THData, batch: BatchStatement) {
    val insertStatement = session.prepare(insertDashBoardQuery)
    //insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
    val boundStatement = new BoundStatement(insertStatement)
    batch.add(boundStatement.bind(data.s_id, data.a_id, "odometer", "calculated", data.odometer, new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis()), data.a_s_no))
  }

  def dashBoardUpdate(session: Session, data: THData, batch: BatchStatement) {
    val updateStatement = session.prepare(updateDashBoardQuery)
    //insertStatement.setConsistencyLevel(ConsistencyLevel.QUORUM)
    val boundStatement = new BoundStatement(updateStatement)
    batch.add(boundStatement.bind(data.odometer, new Date(System.currentTimeMillis()), data.s_id, data.a_id, "odometer", "calculated"))
  }

.................

回答1:

Calling prepare Everytime is not a good idea..to avoid that you can simply keep a map of query string vs prepared statements..you can fill the cache at startup only hence prepare will be called only once ...now in your Cassandrautil methoda you will get prepared statement from map and create the bound statement and execute it.



回答2:

Datastax document already states that

You should prepare only once, and cache the PreparedStatement in your application (it is thread-safe). If you call prepare multiple times with the same query string, the driver will log a warning.

If you execute a query only once, a prepared statement is inefficient because it requires two roundtrips. Consider a simple statement instead.

Java Driver for Apache Cassandra 3.1 (Earlier version) you can check for specific version.

Cashing prepared statement is what is recommended for your application. For that ConcurrentHashMap is good alternate and keep using it (PreparedStatement) as said in the document it is THREAD SAFE. I hope same implementation is provided for scala as well.