I have a list of sockets that belongs to a single thread. But I wonder is there a feasible way to communicate (read/write from/to) with those those clients? I don't want to create one thread for each client because there may be too much users and creating one thread for each of them may be too costly.
问题:
回答1:
I'd say NIO is your best bet here. Check out one of the many good tutorials for NIO communications over a socket (well, SocketChannel)!
I believe this is the tutorial that I used when I had to learn NIO: http://rox-xmlrpc.sourceforge.net/niotut/
回答2:
Netty - the Java NIO Client Server Socket Framework http://www.jboss.org/netty
回答3:
Just use Standard Java NIO The best documentation is written on Java main page http://docs.oracle.com/javase/6/docs/technotes/guides/io/index.html. There are API documentation, samples, tutorial. Everything. I promise you it is enougth - I have experience writing software where 10k clients were connected to one client (a few threads). You must only remember about OS limitation to change it in configuration.
回答4:
you can use NIO approach in JRE. Another solution is using Space Architecture
. In this architecture exist global spaces with Space
name and any request write in this spaces, then another threads read from this spaces and process it and write process result in another space and in final step request thread read own result from specified space.
You can see following link for more information:
http://en.wikipedia.org/wiki/Space_architecture
回答5:
I had to connect multiple server IP:PORTs and do request-response messaging. After implementing a traditional IO with multiple threads and a watchdog killing blocked sockets gave up. I made NIO implementation and this is my test application for future reference.
I can open N connections with timeout, read reply with timeout, write command with timeout everything in a simple single threaded "game loop". If I needed concurrency I could spawn worker threads but not mandatory if application logic does not need it.
Server is a custom telnet app, clients write a command and read text lines until a terminator line prompt is found. Terminator marks the end_of_response_packet.
import java.util.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
public class ClientSocketNIO {
private String host;
private int port;
private String charset;
private ByteArrayOutputStream inBuffer;
private ByteBuffer buf;
private Selector selector;
private SocketChannel channel;
public ClientSocketNIO(String host, int port, String charset) {
this.charset = charset==null || charset.equals("") ? "UTF-8" : charset;
this.host = host;
this.port = port;
}
public void open(long timeout) throws IOException {
selector = Selector.open();
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_CONNECT);
channel.connect(new InetSocketAddress(host, port));
inBuffer = new ByteArrayOutputStream(1024);
buf = ByteBuffer.allocate(1*1024);
long sleep = Math.min(timeout, 1000);
while(timeout > 0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isConnectable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
if (channel.isConnectionPending()) {
channel.finishConnect();
channel.configureBlocking(false);
return; // we are ready to receive bytes
}
}
}
throw new IOException("Connection timed out");
}
public void close() {
try { channel.close(); } catch(Exception ex) { }
try { selector.close(); } catch(Exception ex) { }
inBuffer=null;
buf=null;
}
public List<String> readUntil(String terminator, long timeout, boolean trimLines) throws IOException {
return readUntil(new String[]{terminator}, timeout, trimLines);
}
public List<String> readUntil(String[] terminators, long timeout, boolean trimLines) throws IOException {
List<String> lines = new ArrayList<String>(12);
inBuffer.reset();
// End of packet terminator strings, line startsWith "aabbcc" string.
byte[][] arrTerminators = new byte[terminators.length][];
int[] idxTerminators = new int[terminators.length];
for(int idx=0; idx < terminators.length; idx++) {
arrTerminators[idx] = terminators[idx].getBytes(charset);
idxTerminators[idx] = 0;
}
int idxLineByte=-1;
channel.register(selector, SelectionKey.OP_READ);
long sleep = Math.min(timeout, 1000);
while(timeout>0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isReadable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
buf.clear();
int len = channel.read(buf);
System.out.println("read " + len);
if (len == -1) throw new IOException("Socket disconnected");
buf.flip();
for(int idx=0; idx<len; idx++) {
byte cb = buf.get(idx);
if (cb!='\n') {
idxLineByte++;
inBuffer.write(cb);
for(int idxter=0; idxter < arrTerminators.length; idxter++) {
byte[] arrTerminator = arrTerminators[idxter];
if (idxLineByte==idxTerminators[idxter]
&& arrTerminator[ idxTerminators[idxter] ]==cb) {
idxTerminators[idxter]++;
if (idxTerminators[idxter]==arrTerminator.length)
return lines;
} else idxTerminators[idxter]=0;
}
} else {
String line = inBuffer.toString(charset);
lines.add(trimLines ? line.trim() : line);
inBuffer.reset();
idxLineByte=-1;
for(int idxter=0; idxter<arrTerminators.length; idxter++)
idxTerminators[idxter]=0;
}
}
}
}
throw new IOException("Read timed out");
}
public void write(String data, long timeout) throws IOException {
ByteBuffer outBuffer = ByteBuffer.wrap(data.getBytes(charset));
channel.register(selector, SelectionKey.OP_WRITE);
long sleep = Math.min(timeout, 1000);
while(timeout > 0) {
if (selector.select(sleep) < 1) {
timeout-=sleep;
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (!key.isValid() || !key.isWritable()) continue;
SocketChannel channel = (SocketChannel)key.channel();
int len = channel.write(outBuffer);
System.out.println("write " + len);
if (outBuffer.remaining()<1)
return;
}
}
throw new IOException("Write timed out");
}
public static void main(String[] args) throws Exception {
ClientSocketNIO client = new ClientSocketNIO("11.22.33.44", 1234, "UTF-8");
try {
client.open(15000);
// read prompting for username
List<String> reply = client.readUntil("User: ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
// write username and read a success or failed prompt(asks username once again),
// this one may return two different terminator prompts so listen for both
client.write("myloginname\n", 15000);
reply = client.readUntil(new String[]{"> ", "User: "}, 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
if (!reply.get(reply.size()-1).startsWith("Welcome ")) return; // Access denied
System.out.println("-----");
client.write("help\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
System.out.println("-----");
client.write("get status\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
System.out.println("-----");
client.write("get list\n", 15000);
reply = client.readUntil("> ", 15000, true);
for(int idx=0; idx<reply.size(); idx++)
System.out.println("|"+reply.get(idx)+"|");
client.write("quit\n", 15000);
} finally {
client.close();
}
}
}