多线程搜索操作(Multithreaded search operation)

2019-07-30 10:54发布

我有一个需要查询的阵列的方法,我需要运行它们针对不同的搜索引擎的Web API的,如谷歌的或雅虎。 为了并行的过程中,一个线程产生了对于每个查询,然后再join末版,因为之后每天查询的结果我的应用程序只能继续。 目前,我有东西沿着这些路线:

public abstract class class Query extends Thread {
    private String query;

    public abstract Result[] querySearchEngine();
    @Override
    public void run() {
        Result[] results = querySearchEngine(query);
        Querier.addResults(results);
    }

}

public class GoogleQuery extends Query {
    public Result querySearchEngine(String query) { 
        // access google rest API
    }
}

public class Querier {
    /* Every class that implements Query fills this array */
    private static ArrayList<Result> aggregatedResults;

    public static void addResults(Result[]) { // add to aggregatedResults }

    public static Result[] queryAll(Query[] queries) {
        /* for each thread, start it, to aggregate results */
        for (Query query : queries) {
            query.start();
        }
        for (Query query : queries) {
            query.join();
        }
        return aggregatedResults;
    }
}

最近,我发现有在Java中做兼职一个新的 API。 即, Callable接口, FutureTaskExecutorService 。 我在想,如果这个新的API是应该使用的一个,如果它们比传统的更有效率, RunnableThread

研究这种新的API后,我想出了下面的代码(简体版):

   public abstract class Query implements Callable<Result[]> {
        private final String query; // gets set in the constructor

        public abstract Result[] querySearchEngine();
        @Override
        public Result[] call() {
            return querySearchEngine(query);
        }
    }

public class Querier {   
        private ArrayList<Result> aggregatedResults;

        public Result[] queryAll(Query[] queries) {
            List<Future<Result[]>> futures = new ArrayList<Future<Result[]>>(queries.length);
            final ExecutorService service = Executors.newFixedThreadPool(queries.length);  
            for (Query query : queries) {
                futures.add(service.submit(query));  
            }
            for (Future<Result[]> future : futures) {  
                aggregatedResults.add(future.get());  // get() is somewhat similar to join?
            }  
            return aggregatedResults;
        }
    }

我是新来的这个并发API,我想知道是否有东西,可以在上面的代码加以改进 ,如果它比第一种选择更好(使用Thread )。 有一些课程我没有探索,如FutureTask ,等等。 我很乐意听到任何意见为好。

Answer 1:

几个问题与您的代码。

  1. 你或许应该使用的ExecutorService.invokeAll()方法。 创建新线程和一个新的线程池的成本可能显著(尽管也许不是相对于调用外部搜索引擎)。 的invokeAll()可以管理线程你。
  2. 你可能不希望混合阵列和仿制药。
  3. 您呼叫aggregatedResults.add()代替中的addAll()。
  4. 你并不需要使用成员变量时,他们可以是本地的queryAll()函数调用。

所以,像下面应该工作:

public abstract class Query implements Callable<List<Result>> {
    private final String query; // gets set in the constructor

    public abstract List<Result> querySearchEngine();
    @Override
    public List<Result> call() {
        return querySearchEngine(query);
    }
}

public class Querier {   
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public List<Result> queryAll(List<Query> queries) {
        List<Future<List<Result>>> futures = executor.submitAll(queries);
        List<Result> aggregatedResults = new ArrayList<Result>();
        for (Future<List<Result>> future : futures) {  
            aggregatedResults.addAll(future.get());  // get() is somewhat similar to join?
        }  
        return aggregatedResults;
    }
}


Answer 2:

作为futher改进,你可以考虑使用一个CompletionService它分离提交和检索的顺序,而非把从你参加他们完成的顺序结果队列中的所有未来的结果..



Answer 3:

我可以建议你使用的Future.get()的超时 ?

否则,它会只需要一个搜索引擎不响应使一切都停止(它甚至不需要是一个搜索引擎的问题,比如说,如果你方有一个网络问题)



文章来源: Multithreaded search operation