Thursday, June 23, 2011

Asynchronous UDP server using Java NIO

UDP is a light-weight protocol as compared to TCP. When the data transmitted is small (in hundreds of bytes), and an occasional loss of data is not critical, UDP can be used to improve throughput of the program.

The native sockets library (C) provides the epoll function - available on Linux 2.6.x kernels - that can be used for both TCP and UDP sockets. In an earlier post, I described a framework that can be used to implement an asynchronous client that connects to multiple servers using TCP. I found several code examples that described how Java NIO can be used for this purpose. It turns out that it is even simpler to write a NIO server for UDP.

I would not recommend writing a UDP server if the request/response cannot be transmitted in a single UDP packet or if a packet has a dependency on an earlier packet. UDP packets can arrive out of order and the headers have no sequence numbers to enable re-ordering. If you want to handle reordering, you will be implementing what TCP provides for this purpose and it is probably a better idea to stick with TCP.

The following program does well when the request/response sticks in a single UDP packet. 512 bytes is generally considered the safe maximum size and the DNS protocol mandates a maximum packet size of 512 when it uses UDP.

public class ASyncUDPSvr {
    static int BUF_SZ = 1024;

    class Con {
        ByteBuffer req;
        ByteBuffer resp;
        SocketAddress sa;

        public Con() {
            req = ByteBuffer.allocate(BUF_SZ);
        }
    }

    static int port = 8340;
    private void process() {
        try {
            Selector selector = Selector.open();
            DatagramChannel channel = DatagramChannel.open();
            InetSocketAddress isa = new InetSocketAddress(port);
            channel.socket().bind(isa);
            channel.configureBlocking(false);
            SelectionKey clientKey = channel.register(selector, SelectionKey.OP_READ);
            clientKey.attach(new Con());
            while (true) {
                try {
                    selector.select();
                    Iterator selectedKeys = selector.selectedKeys().iterator();
                    while (selectedKeys.hasNext()) {
                        try {
                            SelectionKey key = (SelectionKey) selectedKeys.next();
                            selectedKeys.remove();

                            if (!key.isValid()) {
                              continue;
                            }

                            if (key.isReadable()) {
                                read(key);
                                key.interestOps(SelectionKey.OP_WRITE);
                            } else if (key.isWritable()) {
                                write(key);
                                key.interestOps(SelectionKey.OP_READ);
                            }
                        } catch (IOException e) {
                            System.err.println("glitch, continuing... " +(e.getMessage()!=null?e.getMessage():""));
                        }
                    }
                } catch (IOException e) {
                    System.err.println("glitch, continuing... " +(e.getMessage()!=null?e.getMessage():""));
                }
            }
        } catch (IOException e) {
            System.err.println("network error: " + (e.getMessage()!=null?e.getMessage():""));
        }
    }

    private void read(SelectionKey key) throws IOException {
        DatagramChannel chan = (DatagramChannel)key.channel();
        Con con = (Con)key.attachment();
        con.sa = chan.receive(con.req);
        System.out.println(new String(con.req.array(), "UTF-8"));
        con.resp = Charset.forName( "UTF-8" ).newEncoder().encode(CharBuffer.wrap("send the same string"));
    }

    private void write(SelectionKey key) throws IOException {
        DatagramChannel chan = (DatagramChannel)key.channel();
        Con con = (Con)key.attachment();
        chan.send(con.resp, con.sa);
    }

    static public void main(String[] args) {
        ASyncUDPSvr svr = new ASyncUDPSvr();
        svr.process();
    }
}

When dealing with small data sizes that fit in one packet, clearly if the NIO interface signals us that data is available to be read, then all the data must be available. Thus the protocol does not need to worry about accumulating network data in buffers. We still do need an object that is tied to each client connection as the reading and writing happen in two distinct parts of the code.

First, after establishing our UDP socket locally on the server, we signal NIO that the socket is ready for reads. When NIO wakes us up - via the select() call - we can immediately read the full request made by the client. At this point, we form our response but do not want to write it back to the network right away, as the kernel buffers may be full and the write may block. So, we store the response on the object attached to the client connection (via the SelectionKey object), signal NIO that we are now ready to write and go back to our select() loop.

Next when NIO wakes us up from the select() call, we can proceed to write. Again since the data fits in one packet, we know that the send() call need not be retried, and all data will be sent.

However, the nature of UDP does not provide the advantages TCP provides in epoll() mode. A UDP server does not provide a separate socket for each new client. Thus the epoll selector always has just the single socket. Each new client sends its datagrams to the single UDP receive buffer of the server.

A threaded server without the use of epoll() might be more advantageous. Each thread could wait on the single server socket, using a receive() call. The kernel will ensure that only one thread wakes up from the receive() call. I hope to use such an implementation and measure both designs.

4 comments:

Anonymous said...

I know this post is old. I can't believe it doesn't have any comments. It's the most useful, concise page I've found about Java NIO. Thanks.

Anonymous said...

What about UDP NIO client?

Anonymous said...

Yes, looking forward to see a non blocking sample.

Anonymous said...

Thanks a lot for this post. It was really useful to work with UDP async api.