Can someone advice how to use function elasticsearch.helpers.streaming_bulk instead elasticsearch.helpers.bulk for indexing data into elasticsearch.
If I simply change streaming_bulk instead of bulk, nothing gets indexed, so I guess it needs to be used in different form.
Code below creates index, type and index data from CSV file in chunks of 500 elemens into elasticsearch. It is working properly but I am wandering is it possible to increse prerformance. That's why I want to try out streaming_bulk function.
Currently I need 10 minutes to index 1 million rows for CSV document of 200MB. I use two machines, Centos 6.6 with 8 CPU-s, x86_64, CPU MHz: 2499.902, Mem: 15.574G total. Not sure can it go any faster.
es = elasticsearch.Elasticsearch([{'host': 'uxmachine-test', 'port': 9200}])
index_name = 'new_index'
type_name = 'new_type'
mapping = json.loads(open(config["index_mapping"]).read()) #read mapping from json file
es.indices.create(index_name)
es.indices.put_mapping(index=index_name, doc_type=type_name, body=mapping)
with open(file_to_index, 'rb') as csvfile:
reader = csv.reader(csvfile) #read documents for indexing from CSV file, more than million rows
content = {"_index": index_name, "_type": type_name}
batch_chunks = []
iterator = 0
for row in reader:
var = transform_row_for_indexing(row,fields, index_name, type_name,id_name,id_increment)
id_increment = id_increment + 1
#var = transform_row_for_indexing(row,fields, index_name, type_name)
batch_chunks.append(var)
if iterator % 500 == 0:
helpers.bulk(es,batch_chunks)
del batch_chunks[:]
print "ispucalo batch"
iterator = iterator + 1
# indexing of last batch_chunk
if len(batch_chunks) != 0:
helpers.bulk(es,batch_chunks)