How to get server response with netty client
Asked Answered
D

4

16

I want to write a netty based client. It should have method public String send(String msg); which should return response from the server or some future - doesen't matter. Also it should be multithreaded. Like this:

public class Client {
public static void main(String[] args) throws InterruptedException {
    Client client = new Client();

}

private Channel channel;

public Client() throws InterruptedException {
    EventLoopGroup loopGroup = new NioEventLoopGroup();

    Bootstrap b = new Bootstrap();
    b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder()).
                    addLast(new StringEncoder()).
                    addLast(new ClientHandler());
        }
    });
    channel = b.connect("localhost", 9091).sync().channel();
}

public String sendMessage(String msg) {
    channel.writeAndFlush(msg);
    return ??????????;
}

}

And I don't get how can I retrieve response from server after I invoke writeAndFlush(); What should I do?

Also I use Netty 4.0.18.Final

Drake answered 17/4, 2014 at 8:30 Comment(0)
E
16

Returning a Future<String> for the method is simple, we are going to implement the following method signature:

public Futute<String> sendMessage(String msg) {

The is relatively easy to do when you are known with the async programming structures. To solve the design problem, we are going to do the following steps:

  1. When a message is written, add a Promise<String> to a ArrayBlockingQueue<Promise>

    This will serve as a list of what messages have recently been send, and allows us to change our Future<String> objects return result.

  2. When a message arrives back into the handler, resolve it against the head of the Queue

    This allows us to get the correct future to change.

  3. Update the state of the Promise<String>

    We call promise.setSuccess() to finally set the state on the object, this will propagate back to the future object.

Example code

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    private ChannelHandlerContext ctx;
    private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        super.channelActive(ctx);
        this.ctx = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        super.channelInactive(ctx);
        synchronized(this){
            Promise<String> prom;
            while((prom = messageList.poll()) != null) 
                prom.setFailure(new IOException("Connection lost"));
            messageList = null;
        }
    }

    public Future<String> sendMessage(String message) {
        if(ctx == null) 
            throw new IllegalStateException();
        return sendMessage(message, ctx.executor().newPromise());
    }

    public Future<String> sendMessage(String message, Promise<String> prom) {
        synchronized(this){
            if(messageList == null) {
                // Connection closed
                prom.setFailure(new IllegalStateException());
            } else if(messageList.offer(prom)) { 
                // Connection open and message accepted
                ctx.writeAndFlush(message).addListener();
            } else { 
                // Connection open and message rejected
                prom.setFailure(new BufferOverflowException());
            }
            return prom;
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) {
        synchronized(this){
            if(messageList != null) {
                 messageList.poll().setSuccess(msg);
            }
        }
    }
}

Documentation breakdown

  • private ChannelHandlerContext ctx;

    Used to store our reference to the ChannelHandlerContext, we use this so we can create promises

  • private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();

    We keep the past messages in this list so we can change the result of the future

  • public void channelActive(ChannelHandlerContext ctx)

    Called by netty when the connection becomes active. Init our variables here.

  • public void channelInactive(ChannelHandlerContext ctx)

    Called by netty when the connection becomes inactive, either due to error or normal connection close.

  • protected void messageReceived(ChannelHandlerContext ctx, String msg)

    Called by netty when a new message arrives, here pick out the head of the queue, and then we call setsuccess on it.

Warning advise

When using futures, there is 1 thing you need to lookout for, do not call get() from 1 of the netty threads if the future isn't done yet, failure to follow this simple rule will either result in a deadlock or a BlockingOperationException.

Equipment answered 10/2, 2016 at 14:37 Comment(6)
There are two caveats to this: 1) The protocol used must guarantee that responses will be sent by the server in the order that requests are received and 2) requests will only be sent to and received from a single server (otherwise 1 will break since the ordering among the various servers may no longer be synchronous). Since a single Bootstrap can be used to connect to multiple servers the second can be a concern, although each connect will produce its own channel so it should be possible to have separate queues for each channel to address this assuming (1) does hold.Cards
What about memory visibility -- is the assignment of ctx in channelActive guaranteed to be seen by the thread calling sendMessage(String)?Blinking
Anyone else getting this error? In return sendMessage(message, ctx.newPromise()); newPromise is of type io.netty.channel.ChannelPromise when required is io.netty.util.concurrent.Promise<String>. After some casts Im getting ClassCastException: java.lang.String cannot be cast to java.lang.Void.Redhanded
I am also getting same compile error at return sendMessage(message, ctx.newPromise()); linePersas
@Persas it was supposed to be ` ctx.executor().newPromise()` instead of ctx.newPromise(), not sure how the old code compiled in the first place, maybe I used a differend netty versionEquipment
Seems like a good answer but I am not seeing how ctx.writeAndFlush(message).addListener(); actually does anything (perhaps the API changed since this was written but there is no empty addListener() method and this branch does nothing to the promise. :-(Cadre
F
4

You can find the sample in netty project. We can save the result into the last handler's custom fields. In the following code, it is handler.getFactorial() that is what we want.

refer to http://www.lookatsrc.com/source/io/netty/example/factorial/FactorialClient.java?a=io.netty:netty-all

FactorialClient.java

public final class FactorialClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8322"));
    static final int COUNT = Integer.parseInt(System.getProperty("count", "1000"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new FactorialClientInitializer(sslCtx));

            // Make a new connection.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Get the handler instance to retrieve the answer.
            FactorialClientHandler handler =
                (FactorialClientHandler) f.channel().pipeline().last();

            // Print out the answer.
            System.err.format("Factorial of %,d is: %,d", COUNT, handler.getFactorial());
        } finally {
            group.shutdownGracefully();
        }
    }
}

public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {

    private ChannelHandlerContext ctx;
    private int receivedMessages;
    private int next = 1;
    final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();

    public BigInteger getFactorial() {
        boolean interrupted = false;
        try {
            for (;;) {
                try {
                    return answer.take();
                } catch (InterruptedException ignore) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.ctx = ctx;
        sendNumbers();
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, final BigInteger msg) {
        receivedMessages ++;
        if (receivedMessages == FactorialClient.COUNT) {
            // Offer the answer after closing the connection.
            ctx.channel().close().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) {
                    boolean offered = answer.offer(msg);
                    assert offered;
                }
            });
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    private void sendNumbers() {
        // Do not send more than 4096 numbers.
        ChannelFuture future = null;
        for (int i = 0; i < 4096 && next <= FactorialClient.COUNT; i++) {
            future = ctx.write(Integer.valueOf(next));
            next++;
        }
        if (next <= FactorialClient.COUNT) {
            assert future != null;
            future.addListener(numberSender);
        }
        ctx.flush();
    }

    private final ChannelFutureListener numberSender = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                sendNumbers();
            } else {
                future.cause().printStackTrace();
                future.channel().close();
            }
        }
    };
}
Frederic answered 25/4, 2016 at 9:25 Comment(0)
D
0

Calling channel.writeAndFlush(msg); already returns a ChannelFuture. To handle the result of this method call, you could add a listener to the future like this:

future.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) {
        // Perform post-closure operation
        // ...
    }
}); 

(this is taken from the Netty documentation see: Netty doc)

Dowser answered 23/4, 2014 at 14:3 Comment(4)
But how to get server response from ChannelFuture?Drake
You need to register a ChannelInboundHandler to the channel. As a matter of fact you probably did this already -> see your ClientHandler. This handler can implement a public void channelRead(ChannelHandlerContext ctx, Object msg) {...} method. It handles the responses from the server.Dowser
I understand basics of netty. On a server it's plain and easy. But I still don't get how to connect this code: public String sendMessage(String msg) { channel.writeAndFlush(msg); return ??????????; } with channelRead(...)Drake
Getting the server response in this method is not possible in netty. Data (in your case the response from the server) is handled by the InboundHandlers which are connected to a channel. In those handlers you can forward a server response to another part of your code. Always keep in mind that netty is an asynchronous framework!Dowser
W
0

Here is another solution, all you need is familiar with the asynchronous programming that netty using.

Below solution is mainly using child netty channel and LinkedBlockingQueue.

In your inbound handler,

@ChannelHandler.Sharable
public class ClientInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Channel channel = ctx.channel();
        Attribute<SensibleRelay> relayAttr = channel.attr(ChannelAttributeKeys.RELAY);
        if (null == relayAttr) {
            return;
        }

        FullHttpResponse httpResponse = (FullHttpResponse) msg;
        ByteBuf content = httpResponse.content();
      
        SensibleRelay relay = relayAttr.get();
        boolean offered = relay.offerResponse(content.toString(StandardCharsets.UTF_8));
        assert offered;
    }
}

In your netty client,

SensibleRelay relay = new SensibleRelay();
future.addListener(new FutureListener<Channel>() {
    @Override
    public void operationComplete(Future<Channel> f) throws Exception {
        if (f.isSuccess()) {
            Channel channel = f.getNow();
            
            channel.attr(ChannelAttributeKeys.RELAY).set(relay);

            channel.writeAndFlush(request);
        } 
    }
});

return relay.takeResponse();

And here is the SensibleRelay class

public class SensibleRelay {

    final BlockingQueue<String> answer = new LinkedBlockingQueue<String>(1);

    public String takeResponse() {
        boolean interrupted = false;
        try {
            for (;;) {
                try {
                    return answer.take();
                } catch (InterruptedException ignore) {
                    interrupted = true;
                }
            }
        } finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public boolean offerResponse(String response) {
        return answer.offer(response);
    }

}

Hope this will help you.

Whiteman answered 11/4, 2023 at 7:33 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.