ElasticsearchTemplate retrieve big data sets

2019-08-21 03:52发布

问题:

I am new to ElasticsearchTemplate. I want to get 1000 documents from Elasticsearch based on my query. I have used QueryBuilder to create my query , and it is working perfectly. I have gone through the following links , which states that it is possible to achieve big data sets using scan and scroll.

link one
link two

I am trying to implement this functionality in the following section of code, which I have copy pasted from one of the link , mentioned above. But I am getting following error :

The type ResultsMapper is not generic; it cannot be parameterized with arguments <myInputDto>.

MyInputDto is a class with @Document annotation in my project. End of the day , I just want to retrieve 1000 documents from Elasticsearch. I tried to find size parameter but I think it is not supported.

String scrollId = esTemplate.scan(searchQuery, 1000, false);
        List<MyInputDto> sampleEntities = new ArrayList<MyInputDto>();
        boolean hasRecords = true;
        while (hasRecords) {
            Page<MyInputDto> page = esTemplate.scroll(scrollId, 5000L,
                    new ResultsMapper<MyInputDto>() {
                        @Override
                        public Page<MyInputDto> mapResults(SearchResponse response) {
                            List<MyInputDto> chunk = new ArrayList<MyInputDto>();
                            for (SearchHit searchHit : response.getHits()) {
                                if (response.getHits().getHits().length <= 0) {
                                    return null;
                                }
                                MyInputDto user = new MyInputDto();
                                user.setId(searchHit.getId());
                                user.setMessage((String) searchHit.getSource().get("message"));
                                chunk.add(user);
                            }
                            return new PageImpl<MyInputDto>(chunk);
                        }
                    });
            if (page != null) {
                sampleEntities.addAll(page.getContent());
                hasRecords = page.hasNextPage();
            } else {
                hasRecords = false;
            }
        }

What is the issue here ? Is there any other alternative to achieve this? I will be thankful if somebody could tell me how this ( code ) is working in the back end.

回答1:

Solution 1

If you want to use ElasticsearchTemplate, it would be much simpler and readable to use CriteriaQuery, as it allows to set the page size with setPageable method. With scrolling, you can get next sets of data:

CriteriaQuery criteriaQuery = new CriteriaQuery(Criteria.where("productName").is("something"));
criteriaQuery.addIndices("prods");
criteriaQuery.addTypes("prod");
criteriaQuery.setPageable(PageRequest.of(0, 1000));

ScrolledPage<TestDto> scroll = (ScrolledPage<TestDto>) esTemplate.startScroll(3000, criteriaQuery, TestDto.class);
while (scroll.hasContent()) {
    LOG.info("Next page with 1000 elem: " + scroll.getContent());
    scroll = (ScrolledPage<TestDto>) esTemplate.continueScroll(scroll.getScrollId(), 3000, TestDto.class);
}
esTemplate.clearScroll(scroll.getScrollId());

Solution 2

If you'd like to use org.elasticsearch.client.Client instead of ElasticsearchTemplate, then SearchResponse allows to set the number of search hits to return:

QueryBuilder prodBuilder = ...;

SearchResponse scrollResp = client.
        prepareSearch("prods")
        .setScroll(new TimeValue(60000))
        .setSize(1000)
        .setTypes("prod")
        .setQuery(prodBuilder)
        .execute().actionGet();

ObjectMapper mapper = new ObjectMapper();
List<TestDto> products = new ArrayList<>();

try {
    do {
        for (SearchHit hit : scrollResp.getHits().getHits()) {
            products.add(mapper.readValue(hit.getSourceAsString(), TestDto.class));
        }
        LOG.info("Next page with 1000 elem: " + products);
        products.clear();
        scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
                .setScroll(new TimeValue(60000))
                .execute()
                .actionGet();
    } while (scrollResp.getHits().getHits().length != 0);
} catch (IOException e) {
    LOG.error("Exception while executing query {}", e);
}