Am indexing pdf using java api. I have installed ingest-attachement processor plugin
and from my java code, am converting PDF into base64 and indexing encoded format of PDF.
Actually, PDFs are available in my machine d:\ drive. The file path are available in ElasticSearch index named as documents_local
. So, am fetching all the records from documents_local
index and getting the file path. Then, am reading the pdf file and encode into base64. Then indexing them.
For this process, am using scrollRequest API to fetch file path from index, because am having more that 100k
documents. so, for indexing 20000
PDFs its taking 8 hours of time with the below java code.
How can i increase the throughput or speed up this process. Its taking very long time to indexing 100k
documents.
public class DocumentIndex {
private final static String INDEX = "documents_local";
private final static String ATTACHMENT = "document_attachment";
private final static String TYPE = "doc";
private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName());
private static final int BUFFER_SIZE = 3 * 1024;
public static void main(String args[]) throws IOException {
RestHighLevelClient restHighLevelClient = null;
RestHighLevelClient restHighLevelClient2 = null;
Document doc=new Document();
logger.info("Started Indexing the Document.....");
//Fetching Id, FilePath & FileName from Document Index.
SearchRequest searchRequest = new SearchRequest(INDEX);
searchRequest.types(TYPE);
final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(60L)); //part of Scroll API
searchRequest.scroll(scroll); //part of Scroll API
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
QueryBuilder qb = QueryBuilders.matchAllQuery();
searchSourceBuilder.query(qb);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = SearchEngineClient.getInstance3().search(searchRequest);
String scrollId = searchResponse.getScrollId(); //part of Scroll API
SearchHit[] searchHits = searchResponse.getHits().getHits();
//part of Scroll API -- Starts
while (searchHits != null && searchHits.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(scroll);
searchResponse = SearchEngineClient.getInstance3().searchScroll(scrollRequest);
scrollId = searchResponse.getScrollId();
searchHits = searchResponse.getHits().getHits();
File all_files_path = new File("d:\\All_Files_Path.txt");
File available_files = new File("d:\\Available_Files.txt");
File missing_files = new File("d:\\Missing_Files.txt");
int totalFilePath=1;
int totalAvailableFile=1;
int missingFilecount=1;
Map<String, Object> jsonMap ;
for (SearchHit hit : searchHits) {
StringBuilder result = null;
String encodedfile = null;
File file=null;
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
if(sourceAsMap != null) {
doc.setId((int) sourceAsMap.get("id"));
doc.setApp_language(String.valueOf(sourceAsMap.get("app_language")));
}
String filepath=doc.getPath().concat(doc.getFilename());
file = new File(filepath);
if(file.exists() && !file.isDirectory()) {
//base64 conversion Starts
FileInputStream fileInputStreamReader2 = new FileInputStream(file);
try ( BufferedInputStream in = new BufferedInputStream(fileInputStreamReader2, BUFFER_SIZE); ) {
try(PrintWriter out = new PrintWriter(new FileOutputStream(available_files, true)) ){
out.println("Available File Count --->"+totalAvailableFile+":::::::ID---> "+doc.getId()+"File Path --->"+filepath);
totalAvailableFile++;
}
Base64.Encoder encoder = Base64.getEncoder();
result = new StringBuilder();
byte[] chunk = new byte[BUFFER_SIZE];
int len = 0;
while ( (len = in.read(chunk)) == BUFFER_SIZE ) {
result.append( encoder.encodeToString(chunk) );
}
if ( len > 0 ) {
chunk = Arrays.copyOf(chunk,len);
result.append( encoder.encodeToString(chunk) );
}
}
//base64 conversion Ends
}
jsonMap = new HashMap<>();
jsonMap.put("id", doc.getId());
jsonMap.put("app_language", doc.getApp_language());
jsonMap.put("fileContent", result);
String id=Long.toString(doc.getId());
IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id )
.source(jsonMap)
.setPipeline(ATTACHMENT);
PrintStream printStream = new PrintStream(new File("d:\\exception.txt"));
try {
IndexResponse response = SearchEngineClient.getInstance3().index(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace(printStream);
}
totalFilePath++;
}
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = SearchEngineClient.getInstance3().clearScroll(clearScrollRequest);
boolean succeeded = clearScrollResponse.isSucceeded();
////part of Scroll API -- Ends
logger.info("Indexing done.....");
}
}