Lucene 4.4.0 new ControlledRealTimeReopenThread sa

2019-04-11 02:22发布

In the new 4.4.0 release of Lucene the Near Real Time Manager (org.apache.lucene.search.NRTManage) has been replaced by ControlledRealTimeReopenThread

Does anyone have some sample code for the new ControlledRealTimeReopenThread usage?

EDIT: I answer my own question below

标签: lucene
3条回答
霸刀☆藐视天下
2楼-- · 2019-04-11 02:33

You should not commit after every document, and you shouldn't need to Thread.interrupt the reopen thread (in fact, this is deadly when you are using NIOFSDirectory). Instead, just call its (ControlledRealTimeReopenThread) close method: under the hood, it notifies itself and should finish quickly.

查看更多
姐就是有狂的资本
3楼-- · 2019-04-11 02:34

I've made some research about the question and built an utillity type... not throughfully tested (specially in concurrency conditions), anyway it works and i'm pretty sure it's thread safe.

@Slf4j
public class LuceneIndex {
    private final IndexWriter _indexWriter;
    private final TrackingIndexWriter _trackingIndexWriter;
    private final ReferenceManager<IndexSearcher> _indexSearcherReferenceManager;
    private final ControlledRealTimeReopenThread<IndexSearcher> _indexSearcherReopenThread;

    private long _reopenToken;      // index update/delete methods returned token

////////// CONSTRUCTOR & FINALIZE
    /**
     * Constructor based on an instance of the type responsible of the lucene index persistence
     */
    @Inject
    public LuceneIndex(final Directory luceneDirectory,
                       final Analyzer analyzer) {
        try {
            // [1]: Create the indexWriter
            _indexWriter = new IndexWriter(luceneDirectory,
                                           new IndexWriterConfig(LuceneConstants.VERSION,
                                                                 analyzer));

            // [2a]: Create the TrackingIndexWriter to track changes to the delegated previously created IndexWriter 
            _trackingIndexWriter = new TrackingIndexWriter(_indexWriter);

            // [2b]: Create an IndexSearcher ReferenceManager to safelly share IndexSearcher instances across
            //       multiple threads
            _indexSearcherReferenceManager = new SearcherManager(_indexWriter,
                                                                 true,
                                                                 null);

            // [3]: Create the ControlledRealTimeReopenThread that reopens the index periodically having into 
            //      account the changes made to the index and tracked by the TrackingIndexWriter instance
            //      The index is refreshed every 60sc when nobody is waiting 
            //      and every 100 millis whenever is someone waiting (see search method)
            //      (see http://lucene.apache.org/core/4_3_0/core/org/apache/lucene/search/NRTManagerReopenThread.html)
            _indexSearcherReopenThread = new ControlledRealTimeReopenThread<IndexSearcher>(_trackingIndexWriter,
                                                                                           _indexSearcherReferenceManager,
                                                                                           60.00,   // when there is nobody waiting
                                                                                           0.1);    // when there is someone waiting
            _indexSearcherReopenThread.start(); // start the refresher thread
        } catch (IOException ioEx) {
            throw new IllegalStateException("Lucene index could not be created: " + ioEx.getMessage());
        }
    }
    @Override
    protected void finalize() throws Throwable {
        this.close();
        super.finalize();
    }
    /**
     * Closes every index
     */
    public void close() {
        try {
            // stop the index reader re-open thread
            _indexSearcherReopenThread.interrupt();
            _indexSearcherReopenThread.close();

            // Close the indexWriter, commiting everithing that's pending
            _indexWriter.commit();
            _indexWriter.close();

        } catch(IOException ioEx) {
            log.error("Error while closing lucene index: {}",ioEx.getMessage(),
                                                             ioEx);
        }
    }
////////// INDEX
    /**
     * Index a Lucene document
     * @param doc the document to be indexed
     */
    public void index(final Document doc) { 
        try {
            _reopenToken = _trackingIndexWriter.addDocument(doc);
            log.debug("document indexed in lucene");
        } catch(IOException ioEx) {
            log.error("Error while in Lucene index operation: {}",ioEx.getMessage(),
                                                                  ioEx);
        } finally {
            try {
                _indexWriter.commit();
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Updates the index info for a lucene document
     * @param doc the document to be indexed
     */
    public void reIndex(final Term recordIdTerm,
                        final Document doc) {   
        try {
            _reopenToken = _trackingIndexWriter.updateDocument(recordIdTerm, 
                                                               doc);
            log.debug("{} document re-indexed in lucene",recordIdTerm.text());
        } catch(IOException ioEx) {
            log.error("Error in lucene re-indexing operation: {}",ioEx.getMessage(),
                                                                  ioEx);
        } finally {
            try {
                _indexWriter.commit();
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Unindex a lucene document
     * @param idTerm term used to locate the document to be unindexed
     *               IMPORTANT! the term must filter only the document and only the document
     *                          otherwise all matching docs will be unindexed
     */
    public void unIndex(final Term idTerm) {
        try {
            _reopenToken = _trackingIndexWriter.deleteDocuments(idTerm);
            log.debug("{}={} term matching records un-indexed from lucene",idTerm.field(),
                                                                           idTerm.text());
        } catch(IOException ioEx) {
            log.error("Error in un-index lucene operation: {}",ioEx.getMessage(),
                                                               ioEx);           
        } finally {
            try {
                _indexWriter.commit(); 
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Delete all lucene index docs
     */
    public void truncate() {
        try {
            _reopenToken = _trackingIndexWriter.deleteAll();
            log.warn("lucene index truncated");
        } catch(IOException ioEx) {
            log.error("Error truncating lucene index: {}",ioEx.getMessage(),
                                                          ioEx);            
        } finally {
            try {
                _indexWriter.commit(); 
            } catch (IOException ioEx) {
                log.error("Error truncating lucene index: {}",ioEx.getMessage(),
                                                              ioEx);
            }
        }
    }
/////// COUNT-SEARCH
    /**
     * Count the number of results returned by a search against the lucene index
     * @param qry the query
     * @return
     */
    public long count(final Query qry) {
        long outCount = 0;
        try {
            _indexSearcherReopenThread.waitForGeneration(_reopenToken);     // wait untill the index is re-opened
            IndexSearcher searcher = _indexSearcherReferenceManager.acquire();
            try {
                TopDocs docs = searcher.search(qry,0);
                if (docs != null) outCount = docs.totalHits;
                log.debug("count-search executed against lucene index returning {}",outCount);
            } finally {
                _indexSearcherReferenceManager.release(searcher);
            }
        } catch (IOException ioEx) {
            log.error("Error re-opening the index {}",ioEx.getMessage(),
                                                      ioEx);
        } catch (InterruptedException intEx) {
            log.error("The index writer periodically re-open thread has stopped",intEx.getMessage(),
                                                                                 intEx);
        }
        return outCount;
    }
    /**
     * Executes a search query
     * @param qry the query to be executed
     * @param sortFields the search query criteria
     * @param firstResultItemOrder the order number of the first element to be returned
     * @param numberOfResults number of results to be returnee
     * @return a page of search results
     */
    public LucenePageResults search(final Query qry,Set<SortField> sortFields,
                                    final int firstResultItemOrder,final int numberOfResults) {
        LucenePageResults outDocs = null;
        try {
            _indexSearcherReopenThread.waitForGeneration(_reopenToken); // wait until the index is re-opened for the last update
            IndexSearcher searcher = _indexSearcherReferenceManager.acquire();
            try {
                // sort crieteria
                SortField[] theSortFields = null;
                if (CollectionUtils.hasData(sortFields)) theSortFields = CollectionUtils.toArray(sortFields,SortField.class);
                Sort theSort = CollectionUtils.hasData(theSortFields) ? new Sort(theSortFields)
                                                                      : null;
                // number of results to be returned
                int theNumberOfResults = firstResultItemOrder + numberOfResults;

                // Exec the search (if the sort criteria is null, they're not used)
                TopDocs scoredDocs = theSort != null ? searcher.search(qry,
                                                                       theNumberOfResults,
                                                                       theSort)
                                                     : searcher.search(qry,
                                                                       theNumberOfResults);
                log.debug("query {} {} executed against lucene index: returned {} total items, {} in this page",qry.toString(),
                                                                                                                (theSort != null ? theSort.toString() : ""),
                                                                                                                scoredDocs != null ? scoredDocs.totalHits : 0,
                                                                                                                scoredDocs != null ? scoredDocs.scoreDocs.length : 0);
                outDocs = LucenePageResults.create(searcher,
                                                   scoredDocs,
                                                   firstResultItemOrder,numberOfResults);
            } finally {
                _indexSearcherReferenceManager.release(searcher);
            }
        } catch (IOException ioEx) {
            log.error("Error freeing the searcher {}",ioEx.getMessage(),
                                                      ioEx);
        } catch (InterruptedException intEx) {
            log.error("The index writer periodically re-open thread has stopped",intEx.getMessage(),
                                                                                 intEx);
        }
        return outDocs;
    }
/////// INDEX MAINTEINANCE
    /**
     * Mergest the lucene index segments into one
     * (this should NOT be used, only rarely for index mainteinance)
     */
    public void optimize() {
        try {
            _indexWriter.forceMerge(1);
            log.debug("Lucene index merged into one segment");
        } catch (IOException ioEx) {
            log.error("Error optimizing lucene index {}",ioEx.getMessage(),
                                                         ioEx);
        }
    }
}

EDIT 2: For thoose using the previous lucene 4.3 NearRealTime Manager type this is the analogous code

@Slf4j
public class LuceneIndexForLucene43 {
    private final IndexWriter _indexWriter;
    private final TrackingIndexWriter _trackingIndexWriter;
    private final NRTManager _searchManager;

    LuceneNRTReopenThread _reopenThread = null;
    private long _reopenToken;  // index update/delete methods returned token

///// CONSTRUCTOR
    /**
     * Constructor based on an instance of the type responsible of the lucene index persistence
     */
    @Inject
    public LuceneIndexForLucene43(final Directory luceneDirectory,
                       final Analyzer analyzer) {
        try {
            // Create the indexWriter
            _indexWriter = new IndexWriter(luceneDirectory,
                                           new IndexWriterConfig(LuceneConstants.VERSION,
                                                                 analyzer));
            _trackingIndexWriter = new NRTManager.TrackingIndexWriter(_indexWriter);
            // Create the SearchManager to exec the search
            _searchManager = new NRTManager(_trackingIndexWriter,
                                            new SearcherFactory(),
                                            true);

            // Open the thread in charge of re-open the index to allow it to see real-time changes
            //      The index is refreshed every 60sc when nobody is waiting 
            //      and every 100 millis whenever is someone waiting (see search method)
            // (see http://lucene.apache.org/core/4_3_0/core/org/apache/lucene/search/NRTManagerReopenThread.html)
            _reopenThread = new LuceneNRTReopenThread(_searchManager,
                                                      60.0,     // when there is nobody waiting
                                                      0.1);     // when there is someone waiting
            _reopenThread.startReopening();

        } catch (IOException ioEx) {
            throw new IllegalStateException("Lucene index could not be created: " + ioEx.getMessage());
        }
    }
    @Override
    protected void finalize() throws Throwable {
        this.close();
        super.finalize();
    }
    /**
     * Closes every index
     */
    public void close() {
        try {
            // stop the index reader re-open thread
            _reopenThread.stopReopening();
            _reopenThread.interrupt();

            // Close the search manager
            _searchManager.close();

            // Close the indexWriter, commiting everithing that's pending
            _indexWriter.commit();
            _indexWriter.close();

        } catch(IOException ioEx) {
            log.error("Error while closing lucene index: {}",ioEx.getMessage(),
                                                             ioEx);
        }
    }
//////// REOPEN-THREAD: Thread in charge of re-open the IndexReader to have access to the 
//                      latest IndexWriter changes
    private class LuceneNRTReopenThread
          extends NRTManagerReopenThread {

        volatile boolean _finished = false;

        public LuceneNRTReopenThread(final NRTManager manager,
                                     final double targetMaxStaleSec,final double targetMinStaleSec) {
            super(manager, targetMaxStaleSec, targetMinStaleSec);
            this.setName("NRT Reopen Thread");
            this.setPriority(Math.min(Thread.currentThread().getPriority()+2, 
                                      Thread.MAX_PRIORITY));
            this.setDaemon(true);
        }
        public synchronized  void startReopening() {
            _finished = false;
            this.start();
        }
        public synchronized void stopReopening() {
            _finished = true;
        }
        @Override
        public void run() {
            while (!_finished) {
                super.run();
            }
        }
    }
/////// INDEX
    /**
     * Index a Lucene document
     * @param doc the document to be indexed
     */
    public void index(final Document doc) { 
        try {
            _reopenToken = _trackingIndexWriter.addDocument(doc);
            log.debug("document indexed in lucene");
        } catch(IOException ioEx) {
            log.error("Error while in Lucene index operation: {}",ioEx.getMessage(),
                                                                  ioEx);
        } finally {
            try {
                _indexWriter.commit();
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Updates the index info for a lucene document
     * @param doc the document to be indexed
     */
    public void reIndex(final Term recordIdTerm,
                        final Document doc) {   
        try {
            _reopenToken = _trackingIndexWriter.updateDocument(recordIdTerm, 
                                                               doc);
            log.debug("{} document re-indexed in lucene",recordIdTerm.text());
        } catch(IOException ioEx) {
            log.error("Error in lucene re-indexing operation: {}",ioEx.getMessage(),
                                                                  ioEx);
        } finally {
            try {
                _indexWriter.commit();
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Unindex a lucene document
     * @param idTerm term used to locate the document to be unindexed
     *               IMPORTANT! the term must filter only the document and only the document
     *                          otherwise all matching docs will be unindexed
     */
    public void unIndex(final Term idTerm) {
        try {
            _reopenToken = _trackingIndexWriter.deleteDocuments(idTerm);
            log.debug("{}={} term matching records un-indexed from lucene",idTerm.field(),
                                                                           idTerm.text());
        } catch(IOException ioEx) {
            log.error("Error in un-index lucene operation: {}",ioEx.getMessage(),
                                                               ioEx);           
        } finally {
            try {
                _indexWriter.commit(); 
            } catch (IOException ioEx) {
                log.error("Error while commiting changes to Lucene index: {}",ioEx.getMessage(),
                                                                              ioEx);
            }
        }
    }
    /**
     * Delete all lucene index docs
     */
    public void truncate() {
        try {
            _reopenToken = _trackingIndexWriter.deleteAll();
            log.warn("lucene index truncated");
        } catch(IOException ioEx) {
            log.error("Error truncating lucene index: {}",ioEx.getMessage(),
                                                          ioEx);            
        } finally {
            try {
                _indexWriter.commit(); 
            } catch (IOException ioEx) {
                log.error("Error truncating lucene index: {}",ioEx.getMessage(),
                                                              ioEx);
            }
        }
    }
////// COUNT-SEARCH
    /**
     * Count the number of results returned by a search against the lucene index
     * @param qry the query
     * @return
     */
    public long count(final Query qry) {
        long outCount = 0;
        try {
            _searchManager.waitForGeneration(_reopenToken);     // wait untill the index is re-opened
            IndexSearcher searcher = _searchManager.acquire();
            try {
                TopDocs docs = searcher.search(qry,0);
                if (docs != null) outCount = docs.totalHits;
                log.debug("count-search executed against lucene index returning {}",outCount);
            } finally {
                _searchManager.release(searcher);
            }
        } catch (IOException ioEx) {
            log.error("Error re-opening the index {}",ioEx.getMessage(),
                                                      ioEx);
        }
        return outCount;
    }
    /**
     * Executes a search query
     * @param qry the query to be executed
     * @param sortFields the search query criteria
     * @param firstResultItemOrder the order number of the first element to be returned
     * @param numberOfResults number of results to be returnee
     * @return a page of search results
     */
    public LucenePageResults search(final Query qry,Set<SortField> sortFields,
                                    final int firstResultItemOrder,final int numberOfResults) {
        LucenePageResults outDocs = null;
        try {
            _searchManager.waitForGeneration(_reopenToken); // wait until the index is re-opened for the last update
            IndexSearcher searcher = _searchManager.acquire();
            try {
                // sort crieteria
                SortField[] theSortFields = null;
                if (CollectionUtils.hasData(sortFields)) theSortFields = CollectionUtils.toArray(sortFields,SortField.class);
                Sort theSort = CollectionUtils.hasData(theSortFields) ? new Sort(theSortFields)
                                                                      : null;
                // number of results to be returned
                int theNumberOfResults = firstResultItemOrder + numberOfResults;

                // Exec the search (if the sort criteria is null, they're not used)
                TopDocs scoredDocs = theSort != null ? searcher.search(qry,
                                                                       theNumberOfResults,
                                                                       theSort)
                                                     : searcher.search(qry,
                                                                       theNumberOfResults);
                log.debug("query {} {} executed against lucene index: returned {} total items, {} in this page",qry.toString(),
                                                                                                                (theSort != null ? theSort.toString() : ""),
                                                                                                                scoredDocs != null ? scoredDocs.totalHits : 0,
                                                                                                                scoredDocs != null ? scoredDocs.scoreDocs.length : 0);
                outDocs = LucenePageResults.create(searcher,
                                                   scoredDocs,
                                                   firstResultItemOrder,numberOfResults);
            } finally {
                _searchManager.release(searcher);
            }
        } catch (IOException ioEx) {
            log.error("Error freeing the searcher {}",ioEx.getMessage(),
                                                      ioEx);
        }
        return outDocs;
    }
/////// INDEX MAINTEINANCE
    /**
     * Mergest the lucene index segments into one
     * (this should NOT be used, only rarely for index mainteinance)
     */
    public void optimize() {
        try {
            _indexWriter.forceMerge(1);
            log.debug("Lucene index merged into one segment");
        } catch (IOException ioEx) {
            log.error("Error optimizing lucene index {}",ioEx.getMessage(),
                                                         ioEx);
        }
    }
}
查看更多
冷血范
4楼-- · 2019-04-11 02:53

'commit' flushes all pending data to disk. So if commit is used, there is no need to use ControlledRealTimeThread. ControlledRealTimeThread coordinates write and read operation in real time.

  • For add document: ReferenceManager.maybeRefresh()

  • For search operation: ReferenceManager.acquire(), op.., ReferenceManager.acquire()

Test Code

package kr.nsri.lucene440;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexWriterConfig.OpenMode;
import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.search.ControlledRealTimeReopenThread;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;

public class AddRandom {

    public static void main(String[] args) throws IOException,
            InterruptedException {

        // Basic Environment
        FSDirectory dir = FSDirectory
                .open(new File("/Users/inseog/tmp/lu440/"));
        Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_44);
        IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
                Version.LUCENE_44, analyzer);
        indexWriterConfig.setOpenMode(OpenMode.CREATE_OR_APPEND);
        IndexWriter indexWriter = new IndexWriter(dir, indexWriterConfig);

        // Real time handler
        TrackingIndexWriter trackingIndexWriter = new TrackingIndexWriter(
                indexWriter);
        ReferenceManager<IndexSearcher> searcherManager = new SearcherManager(
                indexWriter, false, null);

        // thread handling
        ControlledRealTimeReopenThread<IndexSearcher> writeControlThread = new ControlledRealTimeReopenThread<IndexSearcher>(
                trackingIndexWriter, searcherManager, 1.0, 0.1);
        writeControlThread.setName("Update Reopen Thread");
        writeControlThread.setPriority(Math.min(Thread.currentThread()
                .getPriority() + 2, Thread.MAX_PRIORITY));
        writeControlThread.setDaemon(true);
        writeControlThread.start();

        // start writer and reader, and wait 10 minutes
        WriteDoc wdoc = new WriteDoc(trackingIndexWriter, searcherManager);
        ReadDoc rdoc = new ReadDoc(searcherManager);

        ExecutorService exman = Executors.newFixedThreadPool(5);
        exman.submit(wdoc);
        exman.submit(rdoc);

        Thread.sleep(1L * 60L * 1000L);

        exman.shutdown();
        exman.awaitTermination(5, TimeUnit.SECONDS);

        System.out.println("Closing...");

        writeControlThread.close();
        searcherManager.close();
        indexWriter.close();
        dir.close();

    }

}

package kr.nsri.lucene440;

import java.util.Calendar;
import java.util.Random;
import java.util.concurrent.Callable;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.util.Version;

public class ReadDoc implements Callable<Long> {

    private ReferenceManager<IndexSearcher> searcherManager;
    private final Random rand = new Random(Calendar.getInstance()
            .getTimeInMillis());
    private Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_44);
    private QueryParser parser = new QueryParser(Version.LUCENE_44, "content",
            analyzer);

    public ReadDoc(ReferenceManager<IndexSearcher> searchManager) {
        this.searcherManager = searchManager;
    }

    @Override
    public Long call() {
        while (true) {
            try {
                if (Thread.interrupted())
                    break;
                //
                // get index searcher from searcherManager
                //
                IndexSearcher wsrch = searcherManager.acquire();
                //
                // read current status
                //
                System.out.println("------------------ total count: "
                        + wsrch.collectionStatistics("content").docCount());
                Query query = parser.parse("abcde");
                TopDocs topDocs = wsrch.search(query, null, 2000);
                ScoreDoc[] scoreDocs = topDocs.scoreDocs;
                for (ScoreDoc sd : scoreDocs) {
                    Document wd = wsrch.doc(sd.doc);
                    System.out.println("\tRead: " + wd.get("content"));
                }
                //
                // release index searcher for reopen handling
                //
                searcherManager.release(wsrch);
                //
                // random wait
                //
                if (Thread.interrupted())
                    break;
                Thread.sleep(rand.nextInt(15000));
            } catch (Exception e) {
                break;
            }
        }

        return 0L;

    }
}

package kr.nsri.lucene440;

import java.io.IOException;
import java.util.Calendar;
import java.util.Random;
import java.util.concurrent.Callable;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.TrackingIndexWriter;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.ReferenceManager;

public class WriteDoc implements Callable<Long> {

    private TrackingIndexWriter trackingIndexWriter;
    private ReferenceManager<IndexSearcher> searcherManager;
    private final Random rand = new Random(Calendar.getInstance()
            .getTimeInMillis());

    public WriteDoc(TrackingIndexWriter trackingIndexWriter,
            ReferenceManager<IndexSearcher> searcherManager) {
        this.trackingIndexWriter = trackingIndexWriter;
        this.searcherManager = searcherManager;
    }

    @Override
    public Long call() {
        Long wcnt = 0L;
        while (true) {
            if (Thread.interrupted())
                break;
            Document doc = new Document();
            Long tweetID = rand.nextLong();
            String userName = anyWord();
            String content = anySentence();
            doc.add(new LongField("tweetID", tweetID, Store.YES));
            doc.add(new StringField("userScreenname", userName, Store.YES));
            doc.add(new TextField("content", content, Store.YES));
            try {
                //
                // add document
                //
                trackingIndexWriter.addDocument(doc);
                System.out.println("\t"+"new doc: "+content);
                //
                // signal searcher may be reopen to include new document
                //
                searcherManager.maybeRefresh();
                //
                // random wait
                //
                ++wcnt;
                if (Thread.interrupted())
                    break;
                Thread.sleep(rand.nextInt(15000));
            } catch (IOException | InterruptedException e) {
                break;
            }
        }

        return wcnt;
    }

    private String anyWord() {
        String r = "";
        String base = "abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ";
        int wordLen = 1 + rand.nextInt(9);
        int baseLen = base.length();
        for (int wx = 0; wx < (1 + wordLen); ++wx)
            r += base.charAt(rand.nextInt(baseLen));
        return r;
    }

    private String anySentence() {
        String r = "";
        int wordCount = 1 + rand.nextInt(20);
        for (int wx = 0; wx < wordCount; ++wx)
            r += anyWord() + " ";
        return r + "abcde";
    }

}
查看更多
登录 后发表回答