Threadpool with persistent worker instances

2019-08-31 07:52发布

问题:

I'm trying to queue up tasks in a thread pool to be executed as soon as a worker becomes free, i have found various examples of this but in all cases the examples have been setup to use a new Worker instance for each job, i want persistent workers.

I'm trying to make a ftp backup tool, i have it working but because of the limitations of a single connection it is slow. What i ideally want to do is have a single connection for scanning directories and building up a file list then four workers to download said files.

Here is an example of my FTP worker:

public class Worker implements Runnable {
  protected FTPClient _ftp;

  // Connection details
  protected String _host = "";
  protected String _user = "";
  protected String _pass = "";

  // worker status
  protected boolean _working = false;

  public Worker(String host, String user, String pass) {
    this._host = host;
    this._user = user;
    this._pass = pass;
  }

   // Check if the worker is in use
  public boolean inUse() {
    return this._working;
  }

  @Override
  public void run() {
    this._ftp = new FTPClient();
    this._connect();
  }

  // Download a file from the ftp server
  public boolean download(String base, String path, String file) {
    this._working   = true;
    boolean outcome = true;

    //create directory if not exists
    File pathDir = new File(base + path);
    if (!pathDir.exists()) {
      pathDir.mkdirs();
    }

    //download file
    try {
      OutputStream output = new FileOutputStream(base + path + file);
      this._ftp.retrieveFile(file, output);
      output.close();
    } catch (Exception e) {
      outcome = false;
    } finally {
      this._working = false;
      return outcome;
    }
  }

  // Connect to the server
  protected boolean _connect() {
    try {
      this._ftp.connect(this._host);
      this._ftp.login(this._user, this._pass);
    } catch (Exception e) {
      return false;
    }
    return this._ftp.isConnected();
  }

  // Disconnect from the server
  protected void _disconnect() {
    try {
      this._ftp.disconnect();
    } catch (Exception e) { /* do nothing */ }
  }
}

I want to be able to call Worker.download(...) for each task in a queue whenever a worker becomes available without having to create a new connection to the ftp server for each download.

Any help would be appreciated as I've never used threads before and I'm going round in circles at the moment.

回答1:

the examples have been setup to use a new Worker instance for each job, i want persistent workers.

This is a common question with a couple of different solutions. What you want is some context per thread as opposed to per Runnable or Callable that would be submitting to an ExecutorService.

One option would be to have a ThreadLocal which would create your ftp instances. This is not optimal because there would be no easy way to shutdown the ftp connection when the thread is terminated. You would then limit the number of connections by limiting the number of threads running in your thread-pool.

I think a better solution would be to use the ExecutorService only to fork your worker threads. For each worker, inject into them a BlockingQueue that they all use to dequeue and perform the tasks they need to do. This is separate from the queue used internally by the ExecutorService. You would then add the tasks to your queue and not to the ExecutorService itself.

private static final BlockingQueue<FtpTask> taskQueue
        = new ArrayBlockingQueue<FtpTask>();

So your task object would have something like:

public static class FtpTask {
     String base;
     String path;
     String file;
}

Then the run() method in your Worker class would do something like:

public void run() {
    // make our permanent ftp instance
    this._ftp = new FTPClient();
    // connect it for the life of this thread
    this._connect();
    try {
        // loop getting tasks until we are interrupted
        // could also use volatile boolean !shutdown
        while (!Thread.currentThread().isInterrupted()) {
            FtpTask task = taskQueue.take();
            // if you are using a poison pill
            if (task == SHUTDOWN_TASK) {
                break;
            }
            // do the download here
            download(task.base, task.path, task.file);
        }
    } finally {
        this._disconnect();
    }
}

Again, you limit the number of connections by limiting the number of threads running in your thread-pool.

What i ideally want to do is have a single connection for scanning directories and building up a file list then four workers to download said files.

I would have a Executors.newFixedThreadPool(5); and add one thread which does the scanning/building and 4 worker threads that are doing the downloading. The scanning thread would be putting to the BlockingQueue while the worker threads are taking from the same queue.



回答2:

I would suggest go for ThreadPoolexecutor with core size and maxpoolsize as per requirements. Also use a Linked Blocking queue in this case which will act your tasks in it in a FIFO manner.

As soon as a Thread(worker) becomes free the task will be picked from queue and executed.

Check out details of ThreadPoolExecutor. Let me know in case you get stuck anywhere in implementation of ThreadPoolexecutor.