CassandraOperations ingest not writing complete li

2019-09-12 04:18发布

问题:

I am trying to build high speed insert/delete in a cassandra database but a fourth of the data is not getting inserted.

When using Spring Repository.save(List) everything is good (but catastrophic in terms of CPU usage) so I tried the provided CassandraOperations.ingest() (or CQLOperations.ingest()). When dealing with small amount of data everything goes fine, but when I get to several hundreds of records, I start losing data.

Here is an extract of the insert code:

private final String insertPreparedCql = "insert into keyspace.MyData (s, f, m, b, av, c, AD, ga, an, ti, tim, pre, pre2, de, ch, cha, sig, rs, abc, frO, sub, sub1, en, eni, prod, gde, gude, gpde, gpd, gat, gong, glt) "
        + "values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";

    List<List<?>> llData = new ArrayList<>();

    while(!lbufferedData.isEmpty()){

        LNSData dat = lbufferedData.remove(0);
        List<Object> lUniqData = new ArrayList<>();

        lUniqData.add(dat.getS());
        lUniqData.add(dat.getF());
        lUniqData.add(dat.getM());
        lUniqData.add(dat.getB));
        lUniqData.add(dat.getAv());
        lUniqData.add(dat.getC());
        lUniqData.add(dat.getAD());
        lUniqData.add(dat.getGa());
        lUniqData.add(dat.getAn());
        lUniqData.add(dat.getTi());
        lUniqData.add(dat.getTim());
        lUniqData.add(dat.getPre());
        lUniqData.add(dat.getPre2());
        lUniqData.add(dat.getDe());
        lUniqData.add(dat.getCl());
        lUniqData.add(dat.getCSI());
        lUniqData.add(dat.getSSI());
        lUniqData.add(dat.getRev());
        lUniqData.add(dat.getABC());
        lUniqData.add(dat.getFet());
        lUniqData.add(dat.getSus());
        lUniqData.add(dat.getSus1());
        lUniqData.add(dat.getEe());
        lUniqData.add(dat.getEni());
        lUniqData.add(dat.getPId());
        lUniqData.add(dat.getGude());
        lUniqData.add(dat.getGde());
        lUniqData.add(dat.getGpde());
        lUniqData.add(dat.getGid());
        lUniqData.add(dat.getGLat());
        lUniqData.add(dat.getGng());
        lUniqData.add(dat.getGt());

        llData.add(lUniqData);

    }
    if(llData.size()>0){
        logger.debug("Inserting {} elements processed data", llData.size());
        cassandraTemplate.ingest(insertPreparedCql, llData);
    }

When using 'watch -n 1 nodetool tpstats' I noticed the Native-Transport-Requests had All time blocked values, so I set the option -Dcassandra.max_queued_native_transport_requests=4096. No more blocked elements (or dropped), but still everything is not written properly.

I tried calling the ingest only once with all the elements, or multiple time in smaller batches, but the result is always the same (with various amount of lost data).

Same goes with a delete which is based on the PRIMAY KEY elements.

Thanks for your help.