I am implementing netty proxy server as follows: A http request comes in,
- if local cache has data, write to channel and flush
- if not, fetch the data from remote server, add it to cache and flush
I am having difficulty extracting the byteBuf from response in samehandler as where I write to client.
In the example below, if you see channelRead
method of HexDumpProxyFrontendHandler
, you will see how I fetch from cache and write. I have added comments in that method below where I am facing difficulty
This code works end to end. so it can be copied and tested locally.
I can see FullHttpResponse
object in HexDumpProxyBackendhandler#channelRead
. but inside this method, I have no reference to cache nor the id I want to add inside cache.
There are two ways I think this can be solved, I am not clear on how this can be done though.
1) either get cache reference and id in HexdumpProxyBackendHandler, then it becomes easy. but hexDumpBackendhander
is instantiated in channelActive
of HexDumpFrontendHandler
at which point I have not parsed my incoming request
2) get the response bytebuf extracted in HexdumpFrontendHandler#dchannelRead
, in which case it is just cache insertion.
HexDumpProxy.java
public final class HexDumpProxy {
static final int LOCAL_PORT = Integer.parseInt(System.getProperty("localPort", "8082"));
static final String REMOTE_HOST = System.getProperty("remoteHost", "api.icndb.com");
static final int REMOTE_PORT = Integer.parseInt(System.getProperty("remotePort", "80"));
static Map<Long,String> localCache = new HashMap<>();
public static void main(String[] args) throws Exception {
System.err.println("Proxying *:" + LOCAL_PORT + " to " + REMOTE_HOST + ':' + REMOTE_PORT + " ...");
localCache.put(123L, "profile1");
localCache.put(234L, "profile2");
// Configure the bootstrap.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new HexDumpProxyInitializer(localCache, REMOTE_HOST, REMOTE_PORT))
.childOption(ChannelOption.AUTO_READ, false)
.bind(LOCAL_PORT).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
HexDumpProxyInitializer.java
public class HexDumpProxyInitializer extends ChannelInitializer<SocketChannel> {
private final String remoteHost;
private final int remotePort;
private Map<Long, String> cache;
public HexDumpProxyInitializer(Map<Long,String> cache, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.cache=cache;
}
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(
new LoggingHandler(LogLevel.INFO),
new HttpServerCodec(),
new HttpObjectAggregator(8*1024, true),
new HexDumpProxyFrontendHandler(cache, remoteHost, remotePort));
}
}
HexDumpProxyFrontendHandler.java
public class HexDumpProxyFrontendHandler extends ChannelInboundHandlerAdapter {
private final String remoteHost;
private final int remotePort;
private Channel outboundChannel;
private Map<Long, String> cache;
public HexDumpProxyFrontendHandler(Map<Long, String> cache, String remoteHost, int remotePort) {
this.remoteHost = remoteHost;
this.remotePort = remotePort;
this.cache = cache;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
final Channel inboundChannel = ctx.channel();
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler((new ChannelInitializer() {
protected void initChannel(Channel ch) {
ChannelPipeline var2 = ch.pipeline();
var2.addLast((new HttpClientCodec()));
var2.addLast(new HttpObjectAggregator(8192, true));
var2.addLast(new HexDumpProxyBackendHandler(inboundChannel));
}
}))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
System.out.println("msg is instanceof httpRequest");
HttpRequest req = (HttpRequest)msg;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String userId = queryStringDecoder.parameters().get("id").get(0);
Long id = Long.valueOf(userId);
if (cache.containsKey(id)){
StringBuilder buf = new StringBuilder();
buf.append(cache.get(id));
writeResponse(req, ctx, buf);
closeOnFlush(ctx.channel());
return;
}
}
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
//get response back from HexDumpProxyBackendHander and write to cache
//basically I need to do cache.put(id, parse(response));
//how to get response buf from inboundChannel here is the question I am trying to solve
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (outboundChannel != null) {
closeOnFlush(outboundChannel);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
closeOnFlush(ctx.channel());
}
/**
* Closes the specified channel after all queued write requests are flushed.
*/
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
}
//borrowed from HttpSnoopServerHandler.java in snoop example
private boolean writeResponse(HttpRequest request, ChannelHandlerContext ctx, StringBuilder buf) {
// Decide whether to close the connection or not.
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, request.decoderResult().isSuccess()? OK : BAD_REQUEST,
Unpooled.copiedBuffer(buf.toString(), CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
// Encode the cookie.
String cookieString = request.headers().get(HttpHeaderNames.COOKIE);
if (cookieString != null) {
Set<Cookie> cookies = ServerCookieDecoder.STRICT.decode(cookieString);
if (!cookies.isEmpty()) {
// Reset the cookies if necessary.
for (io.netty.handler.codec.http.cookie.Cookie cookie: cookies) {
response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode(cookie));
}
}
} else {
// Browser sent no cookie. Add some.
response.headers().add(HttpHeaderNames.SET_COOKIE, io.netty.handler.codec.http.cookie.ServerCookieEncoder.STRICT.encode("key1", "value1"));
response.headers().add(HttpHeaderNames.SET_COOKIE, ServerCookieEncoder.STRICT.encode("key2", "value2"));
}
// Write the response.
ctx.write(response);
return keepAlive;
}
}
HexDumpProxyBackendHandler.java
public class HexDumpProxyBackendHandler extends ChannelInboundHandlerAdapter {
private final Channel inboundChannel;
public HexDumpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.read();
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse) {
System.out.println("this is fullHttpResponse");
}
inboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
HexDumpProxyFrontendHandler.closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
HexDumpProxyFrontendHandler.closeOnFlush(ctx.channel());
}
}
P.S: I have taken most of the code from netty-example project and customized it
EDIT
Per Ferrygig suggestions, I changed the FrontEndChannelHander#channelRead as follows. I have removed channelActive and have write method implemented
@Override public void channelRead(final ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
System.out.println("msg is instanceof httpRequest");
HttpRequest req = (HttpRequest)msg;
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(req.uri());
String userId = queryStringDecoder.parameters().get("id").get(0);
id = Long.valueOf(userId);
if (cache.containsKey(id)){
StringBuilder buf = new StringBuilder();
buf.append(cache.get(id));
writeResponse(req, ctx, buf);
closeOnFlush(ctx.channel());
return;
}
final Channel inboundChannel = ctx.channel();
//copied from channelActive method
// Start the connection attempt.
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler((new ChannelInitializer() {
protected void initChannel(Channel ch) {
ChannelPipeline var2 = ch.pipeline();
var2.addLast((new HttpClientCodec()));
var2.addLast(new HttpObjectAggregator(8192, true));
var2.addLast(new HexDumpProxyBackendHandler(inboundChannel, cache));
}
}));
//.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(remoteHost, remotePort);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
if (outboundChannel.isActive()) {
outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
// was able to flush out data, start to read the next chunk
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
There are multiple ways to approach this problem, and the way to go differs for your ultimate end goal.
At the moment, you are using a topology of 1 connection inbound is 1 connection outbound, this makes the system design slightly easier as you don't have to worry about syncing multiple requests to same outbound stream.
At the moment, you frontend handler extends
ChannelInboundHandlerAdapter
, this only intercepts "packets" coming into your application, if we make it extendsChannelDuplexHandler
, we can also handle "packets" going out of the applications.To approach this path, we need to update the
HexDumpProxyFrontendHandler
class to extendChannelDuplexHandler
(Lets call it CDH for now).The next step in the process, is to override the
write
method coming from the CDH, so we can intercept when the backend sends us the response back.After we created the write method, we need to update our (non-threadsafe) map by calling the
put
method.We are not done here yet, while we have the code in place, we still need to fix a few bugs in other places in the code.
Non thread-safe map (critical bug)
One of those bugs in that you are using a normal hash map to handle your cache. The problem with this is that this is not thread safe, if multiple people connect to your app at the same time, weird things may happen, including full map corruption as the internal structure of the map updates.
To counter this issue, we are going to "upgrade" the map to a
ConcurrentHashMap
, this map has special structures in place to deal with multiple threads requesting and storing data at the same time, without a huge loss in performance. (if performance is a main concern, you might get higher performance by using an per-thread hash map instead of a global cache, but this means that every resource can be cached up to the amount of threads.No cache removal rules (major bug)
At the moment, there is no code in place to remove outdated resources, this means the cache is going to fill up, until the program has no memory left, and it will then crash spectacularly.
This can be solved by either using a map implementation that provides both thread-safe access and so called removal rules, or using already pre-made caching solutions like Gnuava caches.
Failure to handle HTTP Pipelining properly (minor-major bug)
One of the lesser known features of HTTP is pipelining, this basically means that the client can send another request to the server, without waiting for a response on the previous request. Bugs of this type include servers that swap the content of both requests around, or even mangle them completely.
While pipelined requests are rare these days with more and more HTTP2 support and the knowledge of that there are broken servers out there, it still happens with certain CLI tools that use it.
To solve this issue, ONLY read a request AFTER you send the previous response, one of the ways to do is keeping a list of requests, or go for more advanced pre-make solutions
storm
I may be wrong, when I read this part of your
HexDumpProxyFrontendHandler
, I feel like something is probably incorrect (I put my comments a litlle bit ahead according to correct style to make them visible):For me, it's look like you didn't wait for the channel to be opened. You lack of some log when you send to wire, in order to ensure that you really send something (in the logs, we can only see that the connection is opened, then closing mainly, with nothing in between).
Maybe some more logs could help us and you?