As the title, and assume the size of byte array is no larger than 16 Kbytes.
Currently I am implementing a middleware for MySQL (like MySQL Proxy), which requires high throughput. but the overhead caused by reading data from socket and writing data to socket. For now, I use
in = new DataInputStream(new BufferedInputStream(socket.getInputStream()))
and
out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()))
When read data and write, I use
in.read(byte[] b)
and out.write(byte[] b, int offset, int len)
with out.flush()
Can anyone tell me a better way to do this?
If you're writing byte arrays it doesn't make much difference. The network is the limiting factor, not the API. I think you're already doing it near-optimally. The most significant factor is the size of your socket send buffer in the kernel, and the socket receive buffer at the receiver.
You could investigate NIO and direct buffers, but I doubt you'll see a significant difference. Direct buffers are really for the case where you're just copying between channels, and the rest of NIO is really about scalability rather than performance over an individual channel.
Since you are just forwarding bytes, you could save a little time by not using DataInputStream, and instead just using BufferedInputStream.read() and BufferedOutputStream.write().
As EJP mentions, the network is the limiting factor. But that did not stop me trying to make the fastest implementation I could imagine without using NIO. The thing is, you can read from a socket while you write to another/the same socket. One thread cannot do this (either reads or writes) so multiple threads are needed. But without NIO, that requires a lot of threads (mostly sitting idle waiting on I/O though). NIO is a bit more complicated but is very good at using very few threads when there are a lot of connections with low volume (see the summary on this page of the article that Baldy mentions).
Anyway, below a non-NIO test class that you can update and use to see for yourself what is (not) the limiting factor.
public class SocketForwarder {
public static void main(String[] args) {
try {
new SocketForwarder().forward();
} catch (Exception e) {
e.printStackTrace();
}
}
public static final int portNumber = 54321;
public static final int maxSend = 1024 * 1024 * 100; // 100 MB
public static final int bufSize = 16 * 1024;
public static final int maxBufInMem = 128;
private static final SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss.SSS");
private final ExecutorService tp = Executors.newCachedThreadPool();
private final ArrayBlockingQueue<byte[]> bq = new ArrayBlockingQueue<byte[]>(maxBufInMem);
private final CountDownLatch allReceived = new CountDownLatch(1);
private Socket from, to, sender, receiver;
private int bytesSend, bytesReceived;
public void forward() throws Exception {
tp.execute(new Runnable() {
public void run() {
ServerSocket ss = null;
try {
ss = new ServerSocket(portNumber);
from = ss.accept();
to = ss.accept();
} catch (Exception e) {
e.printStackTrace();
} finally {
try { ss.close(); } catch (Exception ignored) {}
}
}
});
sender = new Socket(InetAddress.getLocalHost(), portNumber);
receiver = new Socket(InetAddress.getLocalHost(), portNumber);
// Setup proxy reader.
tp.execute(new Runnable() {
public void run() {
byte[] buf = new byte[bufSize];
try {
InputStream in = from.getInputStream();
int l = 0;
while ((l = in.read(buf)) > 0) {
byte[] bufq = new byte[l];
System.arraycopy(buf, 0, bufq, 0, l);
bq.put(bufq);
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// Setup proxy writer.
tp.execute(new Runnable() {
public void run() {
try {
OutputStream out = to.getOutputStream();
while (true) {
byte[] bufq = bq.take();
out.write(bufq);
out.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
// Start receiver.
tp.execute(new Runnable() {
public void run() {
byte[] buf = new byte[bufSize];
try {
InputStream in = receiver.getInputStream();
int l = 0;
while (bytesReceived < maxSend && (l = in.read(buf)) > 0) {
bytesReceived += l;
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(df.format(new Date()) + " bytes received: " + bytesReceived);
allReceived.countDown();
}
});
// Start sender.
tp.execute(new Runnable() {
public void run() {
Random random = new Random();
try {
OutputStream out = sender.getOutputStream();
System.out.println(df.format(new Date()) + " start sending.");
while (bytesSend < maxSend) {
byte[] buf = new byte[random.nextInt(bufSize)];
out.write(buf);
out.flush();
bytesSend += buf.length;
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Bytes send: " + bytesSend);
}
});
try {
allReceived.await();
} finally {
close(sender);
close(from);
close(to);
close(receiver);
tp.shutdownNow();
}
}
private static void close(Socket s) {
try { s.close(); } catch (Exception ignored) {}
}
}
It took my computer 2 seconds to transfer 100MB locally, expect a lot less when a network is involved.
For the best throughput you're going to want to use NIO and ByteBuffers. NIO keeps most of the work reading and writing to the sockets in native code and so can be much faster.
It is more complex to write good NIO code but depending on what kind of performance you're looking for, it can be worth the effort.
There are some good NIO examples out there along with some good introductions and comparisons. One resource I've used is http://tutorials.jenkov.com/java-nio/index.html.