I'm trying to queue up tasks in a thread pool to be executed as soon as a worker becomes free, i have found various examples of this but in all cases the examples have been setup to use a new Worker instance for each job, i want persistent workers.
I'm trying to make a ftp backup tool, i have it working but because of the limitations of a single connection it is slow. What i ideally want to do is have a single connection for scanning directories and building up a file list then four workers to download said files.
Here is an example of my FTP worker:
public class Worker implements Runnable {
protected FTPClient _ftp;
// Connection details
protected String _host = "";
protected String _user = "";
protected String _pass = "";
// worker status
protected boolean _working = false;
public Worker(String host, String user, String pass) {
this._host = host;
this._user = user;
this._pass = pass;
}
// Check if the worker is in use
public boolean inUse() {
return this._working;
}
@Override
public void run() {
this._ftp = new FTPClient();
this._connect();
}
// Download a file from the ftp server
public boolean download(String base, String path, String file) {
this._working = true;
boolean outcome = true;
//create directory if not exists
File pathDir = new File(base + path);
if (!pathDir.exists()) {
pathDir.mkdirs();
}
//download file
try {
OutputStream output = new FileOutputStream(base + path + file);
this._ftp.retrieveFile(file, output);
output.close();
} catch (Exception e) {
outcome = false;
} finally {
this._working = false;
return outcome;
}
}
// Connect to the server
protected boolean _connect() {
try {
this._ftp.connect(this._host);
this._ftp.login(this._user, this._pass);
} catch (Exception e) {
return false;
}
return this._ftp.isConnected();
}
// Disconnect from the server
protected void _disconnect() {
try {
this._ftp.disconnect();
} catch (Exception e) { /* do nothing */ }
}
}
I want to be able to call Worker.download(...)
for each task in a queue whenever a worker becomes available without having to create a new connection to the ftp server for each download.
Any help would be appreciated as I've never used threads before and I'm going round in circles at the moment.
I would suggest go for ThreadPoolexecutor with core size and maxpoolsize as per requirements. Also use a Linked Blocking queue in this case which will act your tasks in it in a FIFO manner.
As soon as a Thread(worker) becomes free the task will be picked from queue and executed.
Check out details of ThreadPoolExecutor. Let me know in case you get stuck anywhere in implementation of ThreadPoolexecutor.
This is a common question with a couple of different solutions. What you want is some context per thread as opposed to per
Runnable
orCallable
that would be submitting to anExecutorService
.One option would be to have a
ThreadLocal
which would create yourftp
instances. This is not optimal because there would be no easy way to shutdown the ftp connection when the thread is terminated. You would then limit the number of connections by limiting the number of threads running in your thread-pool.I think a better solution would be to use the
ExecutorService
only to fork your worker threads. For each worker, inject into them aBlockingQueue
that they all use to dequeue and perform the tasks they need to do. This is separate from the queue used internally by theExecutorService
. You would then add the tasks to your queue and not to theExecutorService
itself.So your task object would have something like:
Then the
run()
method in yourWorker
class would do something like:Again, you limit the number of connections by limiting the number of threads running in your thread-pool.
I would have a
Executors.newFixedThreadPool(5);
and add one thread which does the scanning/building and 4 worker threads that are doing the downloading. The scanning thread would be putting to theBlockingQueue
while the worker threads are taking from the same queue.