Reindexing Elastic search via Bulk API, scan and s

2019-01-22 14:54发布

问题:

I am trying to re-index my Elastic search setup, currently looking at the Elastic search documentation and an example using the Python API

I'm a little bit confused as to how this all works though. I was able to obtain the scroll ID from the Python API:

es = Elasticsearch("myhost")

index = "myindex"
query = {"query":{"match_all":{}}}
response = es.search(index= index, doc_type= "my-doc-type", body= query, search_type= "scan", scroll= "10m")

scroll_id = response["_scroll_id"]

Now my question is, what use is this to me? What does knowing the scrolling id even give me? The documentation says to use the "Bulk API" but I have no idea how the scoll_id factors into this, it was a little confusing.

Could anyone give a brief example showing my how to re-index from this point, considering that I've got the scroll_id correctly?

回答1:

here is an example of reindexing to another elasticsearch node using elasticsearch-py:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des)

you can also reindex the result of a query to a different index here is how to do it:

from elasticsearch import helpers
es_src = Elasticsearch(["host"])
es_des = Elasticsearch(["host"])

body = {"query": {"term": {"year": "2004"}}}
helpers.reindex(es_src, 'src_index_name', 'des_index_name', target_client=es_des, query=body)


回答2:

Hi you can use the scroll api to go through all the documents in the most efficient way. Using the scroll_id you can find a session that is stored on the server for your specific scroll request. So you need to provide the scroll_id with each request to obtain more items.

The bulk api is for more efficient indexing documents. When copying and index you need both, but they are not really related.

I do have some java code that might help you to get a better idea about how it works.

    public void reIndex() {
    logger.info("Start creating a new index based on the old index.");

    SearchResponse searchResponse = client.prepareSearch(MUSIC_INDEX)
            .setQuery(matchAllQuery())
            .setSearchType(SearchType.SCAN)
            .setScroll(createScrollTimeoutValue())
            .setSize(SCROLL_SIZE).execute().actionGet();

    BulkProcessor bulkProcessor = BulkProcessor.builder(client,
            createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD)
            .setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
            .setFlushInterval(createFlushIntervalTime())
            .build();

    while (true) {
        searchResponse = client.prepareSearchScroll(searchResponse.getScrollId())
                .setScroll(createScrollTimeoutValue()).execute().actionGet();

        if (searchResponse.getHits().getHits().length == 0) {
            logger.info("Closing the bulk processor");
            bulkProcessor.close();
            break; //Break condition: No hits are returned
        }

        for (SearchHit hit : searchResponse.getHits()) {
            IndexRequest request = new IndexRequest(MUSIC_INDEX_NEW, hit.type(), hit.id());
            request.source(hit.sourceRef());
            bulkProcessor.add(request);
        }
    }
}


回答3:

For anyone who runs into this problem, you can use the following API from the Python client to reindex:

https://elasticsearch-py.readthedocs.org/en/master/helpers.html#elasticsearch.helpers.reindex

This would help you avoid having to scroll and search to get all the data and use the bulk API to put data into the new index.