Is there a way to return a custom promise from cha

2019-07-19 17:18发布

I'm currently working on implementing a privacy preserving data mining algorithm. For the communication part between the different parties I'm using Netty 4.0. The communication flow between the parties looks like this:

         -- multiplicationMsg --> ... -- multiplicationMsg -->
   P_{1}                                                       P_{N}
         <-- multiplicationMsg -- ... <-- multiplicationMsg --

where P_{1} is the master party that initiates and controls the whole computation. The logic for the secure multi-party multiplication is located in Netty ChannelHandlers. There is also another protocol for secure addition.

At the moment I use a similar solution like this, shown by Norman Maurer from Netty core team, to get informed if a sub-protocol computation has finished. But that feels a bit like fighting against the framework.

Is there a way to get a custom promise from channel.write(msg), that will be created and fulfilled in the ChannelPipeline? In my example above, it should be fulfilled when multiplicationMsg arrives back at P_{1}.

Edit 1

This is what I normally do to write a message from outside of the ChannelPipeline:

ChannelFuture f = channel.write(msg);
future.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
         //do something with the future
    }
});

The ChannelFuture f from the example above will be fulfilled, if the data could be written to the socket or if a failure occurs. But I need a way do get back a custom Future in addition to the ChannelFuture, somehow like:

ChannelFuture f = channel.write(msg);
future.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
         // I need something like the following
         if(future.isSuccess()) {
             Future myFuture = future.getMyFuture();
         }
    }
});

标签: netty
1条回答
smile是对你的礼貌
2楼-- · 2019-07-19 17:45

There are many ways to do it, here's one example building on top of netty:

From outside the pipeline, send the message using a public method inside a class (let's say, IoClient) containing the ChannelFuture (from the connection initialization). The method would look something like this:

public MyCustomFuture send(String msg) {
  MyCustomFuture responseFuture = new MyCustomFuture();

  channelFuture.channel().pipeline().get(MyAppClientHandler.class).setResponseFuture(responseFuture);
  channelFuture.channel().writeAndFlush(msg);   

  return responseFuture;
}

MyCustomFuture is the custom class we create implementing netty's Future interface, so it's instance will proxy our message. MyAppClientHandler is the netty pipe to be fulfilling the promise (in responseFuture), and .setResponseFuture(...) adds the proxy to the pipe.

Depending on the initialization of the channel, channelFuture.channel() might still be null, giving us a NullPointerException. So we need to change the code above to insert the proxy from inside a callback:

public MyCustomFuture send(final String msg) {
  final MyCustomFuture responseFuture = new MyCustomFuture();

  channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
      channelFuture.channel().pipeline()
          .get(MyAppClientHandler.class).setResponseFuture(responseFuture);
      channelFuture.channel().writeAndFlush(msg);                               
    }
  });

  return responseFuture;
}

One more thing about MyCustomFuture is that it will need a setter method:

public void set(String msg) throws InterruptedException {
  if (state == State.DONE) {
    return;
  }
  blockingReplyHolder.put(msg);
  state = State.DONE;
}

blockingReplyHolder, as the name suggests, is the implementation's field that holds the message that fulfills the promise and blocks if it still doesn't exist (check Future)

Right. Now, when the expected message reaches the pipe MyAppClientHandler, we can fulfill the promise like:

protected void channelRead(ChannelHandlerContext ctx, String msg) throws Exception {
    responseFuture.set(msg);
}

The resulting custom API's usage would be:

MyCustomFuture future = ioClient.send(message);
// do other stuff if needed
String response = future.get(); // waits if necessary
// make use of the response

This answer was born from a example I was toying with.

查看更多
登录 后发表回答