I'm trying to queue up tasks in a thread pool to be executed as soon as a worker becomes free, i have found various examples of this but in all cases the examples have been setup to use a new Worker instance for each job, i want persistent workers.
I'm trying to make a ftp backup tool, i have it working but because of the limitations of a single connection it is slow. What i ideally want to do is have a single connection for scanning directories and building up a file list then four workers to download said files.
Here is an example of my FTP worker:
public class Worker implements Runnable {
protected FTPClient _ftp;
// Connection details
protected String _host = "";
protected String _user = "";
protected String _pass = "";
// worker status
protected boolean _working = false;
public Worker(String host, String user, String pass) {
this._host = host;
this._user = user;
this._pass = pass;
}
// Check if the worker is in use
public boolean inUse() {
return this._working;
}
@Override
public void run() {
this._ftp = new FTPClient();
this._connect();
}
// Download a file from the ftp server
public boolean download(String base, String path, String file) {
this._working = true;
boolean outcome = true;
//create directory if not exists
File pathDir = new File(base + path);
if (!pathDir.exists()) {
pathDir.mkdirs();
}
//download file
try {
OutputStream output = new FileOutputStream(base + path + file);
this._ftp.retrieveFile(file, output);
output.close();
} catch (Exception e) {
outcome = false;
} finally {
this._working = false;
return outcome;
}
}
// Connect to the server
protected boolean _connect() {
try {
this._ftp.connect(this._host);
this._ftp.login(this._user, this._pass);
} catch (Exception e) {
return false;
}
return this._ftp.isConnected();
}
// Disconnect from the server
protected void _disconnect() {
try {
this._ftp.disconnect();
} catch (Exception e) { /* do nothing */ }
}
}
I want to be able to call Worker.download(...)
for each task in a queue whenever a worker becomes available without having to create a new connection to the ftp server for each download.
Any help would be appreciated as I've never used threads before and I'm going round in circles at the moment.
the examples have been setup to use a new Worker instance for each job, i want persistent workers.
This is a common question with a couple of different solutions. What you want is some context per thread as opposed to per Runnable
or Callable
that would be submitting to an ExecutorService
.
One option would be to have a ThreadLocal
which would create your ftp
instances. This is not optimal because there would be no easy way to shutdown the ftp connection when the thread is terminated. You would then limit the number of connections by limiting the number of threads running in your thread-pool.
I think a better solution would be to use the ExecutorService
only to fork your worker threads. For each worker, inject into them a BlockingQueue
that they all use to dequeue and perform the tasks they need to do. This is separate from the queue used internally by the ExecutorService
. You would then add the tasks to your queue and not to the ExecutorService
itself.
private static final BlockingQueue<FtpTask> taskQueue
= new ArrayBlockingQueue<FtpTask>();
So your task object would have something like:
public static class FtpTask {
String base;
String path;
String file;
}
Then the run()
method in your Worker
class would do something like:
public void run() {
// make our permanent ftp instance
this._ftp = new FTPClient();
// connect it for the life of this thread
this._connect();
try {
// loop getting tasks until we are interrupted
// could also use volatile boolean !shutdown
while (!Thread.currentThread().isInterrupted()) {
FtpTask task = taskQueue.take();
// if you are using a poison pill
if (task == SHUTDOWN_TASK) {
break;
}
// do the download here
download(task.base, task.path, task.file);
}
} finally {
this._disconnect();
}
}
Again, you limit the number of connections by limiting the number of threads running in your thread-pool.
What i ideally want to do is have a single connection for scanning directories and building up a file list then four workers to download said files.
I would have a Executors.newFixedThreadPool(5);
and add one thread which does the scanning/building and 4 worker threads that are doing the downloading. The scanning thread would be putting to the BlockingQueue
while the worker threads are taking from the same queue.
I would suggest go for ThreadPoolexecutor with core size and maxpoolsize as per requirements. Also use a Linked Blocking queue in this case which will act your tasks in it in a FIFO manner.
As soon as a Thread(worker) becomes free the task will be picked from queue and executed.
Check out details of ThreadPoolExecutor. Let me know in case you get stuck anywhere in implementation of ThreadPoolexecutor.