Tuesday, November 10, 2009

Java : A callback API for epoll(), building on top of Java NIO


I just finished the first draft of the Java callback API that allows you to do asynchronous socket communication using the scalable epoll() mechanism of Linux. This is built on top of Java NIO, so it will take advantage of the best underlying network mechanism the OS has to offer. In Linux 2.6 + /FreeBSD this will likely be epoll().

Use the API if you need to get data from a large number of hosts quickly without the overhead of a thread for each host/connection. Having a thread per connection has serious issues as the number of simultaneous connections increases, specially if memory is scarce.

The Java NIO api can certainly be used for this purpose. But using that API is somewhat difficult and there are various caveats you have to guard against. You need to understand the Selector class, SelectionKey class, InterestOps class, SocketChannel class and their interplay. You need to know the logic for canceling SelectionKeys and setting InterestOps appropriately. And you need to understand the cryptic ByteBuffer class and its variants, possibly how the ByteArrayOutputStream works as well. You need to figure out how to use these classes to store the data separately for each connection, keeping in mind that the data will likely arrive mixed. You also should not call select if all sockets have been completely read, or select() will block - which means you will have to keep track of pending hosts.

The callback framework I developed aims to relieve you of all these details. It lets you focus on the flow of calls between the client and each host along with the application logic once you get data from the hosts.

Usage:
Extend the abstract class ConnectionParam. (Let's name it ConnectionParamImpl)
Create a ConnectionParamImpl object for each connection and stuff these into an array.
Call ASyncSocketHandler.connect_all(array_of_connections) - the argument is the above array.

Since we extended the ConnectionParam class, we need to implement some methods. These are simply the callback methods that tell us that data has either been written to the network, or read from the network. This is where we get a chance to examine the data and do what we need to do from the application's point of view.

    // returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
public abstract int connected_ok();
public abstract int connect_failed();
public abstract int write_ok(boolean eod);
public abstract int read_ok(boolean eod);



Each of the over-ridden methods needs to return an int signaling the next network operation you are interested in. The available options are:

ASyncSocketHandler.OP_READ
ASyncSocketHandler.OP_WRITE
ASyncSocketHandler.OP_CLOSE



This is a simple usage scenario that gets output from a bunch of known time of day servers.

There are two objects you need to implement:

ConnectionParamImpl class and ASyncEchoClient class.
ConnectionParamImpl represents a connection to a single host.
ASyncEchoClient is the application entry point.

import java.io.UnsupportedEncodingException;

/**
* @author: thushara
* @version: Nov 8, 2009
*/
public class ConnectionParamImpl extends ConnectionParam {

public ConnectionParamImpl(byte[] ip, int port) {
super(ip, port);
}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
// after connecting, we can proceed to read
public int connected_ok() {
return ASyncSocketHandler.OP_READ;
}

public int connect_failed() {
return ASyncSocketHandler.OP_CLOSE;
}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
// param: eod = true iif if the complete request was written to the socket
// if request is not fully written, return signal to write again
// else, we can go on to read, return read singnal.
public int write_ok(boolean eod) {
return eod ? ASyncSocketHandler.OP_READ : ASyncSocketHandler.OP_WRITE;
}

// params: eod = true iif all data was read from the socket, socket has no more data
// if all data is read, this socket won't return any more data so
// we should close the socket, signal such
// else we should continue reading, signal such.
public int read_ok(boolean eod) {
if (eod) {
byte[] resp = getResponse();
try {
System.out.println(new String(resp,"UTF-8"));
} catch (UnsupportedEncodingException e) {
System.out.println("bad chars - non utf-8");
}
}
return eod ? ASyncSocketHandler.OP_CLOSE : ASyncSocketHandler.OP_READ;
}

}



/**
* @author: thushara
* @version: Nov 8, 2009
*/
public class ASyncEchoClient {
public static void main(String[] args) {

ConnectionParam.verbose = true;
ASyncSocketHandler.verbose = true;

ASyncSocketHandler hndlr = new ASyncSocketHandler();
int port = 13;

String[] ips = {"64.90.182.55", "129.6.15.28", "129.6.15.29", "206.246.118.250", "64.236.96.53", "68.216.79.113", "208.66.175.36",
"173.14.47.149", "64.113.32.5", "132.163.4.101", "132.163.4.102", "132.163.4.103", "192.43.244.18", "128.138.140.44",
"128.138.188.172", "198.60.73.8", "131.107.13.100", "207.200.81.113", "69.25.96.13", "64.125.78.85", "64.147.116.229"};
ConnectionParam[] params = new ConnectionParamImpl[ips.length];

for (int j=0; j<ips.length; j++) {
String ip = ips[j];
String[] sb = ip.split("\\.");
byte[] bytes = new byte[sb.length];
for (int i=0; i<sb.length; i++) {
bytes[i] = (byte)Integer.parseInt(sb[i]);
}
params[j] = new ConnectionParamImpl(bytes, port);
}

try {
hndlr.connect_all(params);
} catch (Exception e) {
System.err.println("unexpected :" + e.getMessage());
e.printStackTrace();
}
}
}


Notice that you don't see a single reference to a socket or a byte buffer here. All the communication details are wrapped in ASyncSocketHandler.java and ConnectionParam.java.

This is the output you should see, basically the time of day as reported by each host:


55145 09-11-10 23:13:25 00 0 0 434.7 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 401.7 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 398.1 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 402.8 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 383.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 386.1 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 348.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 347.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 348.2 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 347.9 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 339.2 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 316.3 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 306.9 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 310.7 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 313.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 298.7 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 295.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 274.6 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 268.1 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 268.0 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 275.9 UTC(NIST) *


This is the framework implementation. There are two files.

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CharacterCodingException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.net.InetSocketAddress;

/**
* @author: thushara
* @version: Nov 6, 2009
*/
public abstract class ConnectionParam {
private byte[] ip;
private int port;
private ByteBuffer request;
private ByteBuffer response;
private int respBufSize = 1024;
private ByteArrayOutputStream outStrm;
private SocketChannel client;

public static boolean verbose = false;

public ConnectionParam(byte[] ip, int port) {
this.ip = ip;
this.port = port;
response = ByteBuffer.allocate(respBufSize);
outStrm = new ByteArrayOutputStream();
}

public ConnectionParam(byte[] ip, int port, String request) throws CharacterCodingException {
this(ip,port);
setRequest(request);
}

public ConnectionParam(byte[] ip, int port, String request, int respBufSize) throws CharacterCodingException {
this(ip,port);
setRequest(request);
this.respBufSize = respBufSize;
}

public ConnectionParam(byte[] ip, int port, int respBufSize) {
this(ip,port);
this.respBufSize = respBufSize;
}

public void setRequest(String request) throws CharacterCodingException {
Charset charset = Charset.forName( "UTF-8" );
CharsetEncoder encoder = charset.newEncoder();
this.request = encoder.encode(CharBuffer.wrap(request));
}

public byte[] getResponse() {
int respSize = outStrm.size() + response.position();
byte[] out = new byte[respSize];
System.arraycopy(outStrm.toByteArray(),0,out,0,outStrm.size());

int prevPos = response.position();
response.flip();
response.get(out,outStrm.size(),response.limit());
response.position(prevPos);
response.limit(response.capacity());

return out;
}

private InetAddress getAddr() {
try {
return InetAddress.getByAddress(ip);
} catch (UnknownHostException e) {
return null;
}
}

private void store() throws IOException {
response.flip();
byte[] bytes = new byte[response.limit()];
response.get(bytes);
outStrm.write(bytes, 0, bytes.length);
response.clear(); //initialize for reading from socket again
}


public SocketChannel getSocketChannel() throws IOException {
if (client != null) return client;

client = SocketChannel.open();
client.configureBlocking(false);
client.connect(new InetSocketAddress(getAddr(),port));
return client;
}

public String toString() {
InetAddress addr = null;
try {
addr = InetAddress.getByAddress(ip);
} catch (UnknownHostException e) {
return "";
}
return addr.toString() + ":" + port;
//return String.format("%d.%d.%d.%d:%d", ip[0], ip[1], ip[2], ip[3], port);
}

//returns true if all data is written to the socket
public boolean write() throws IOException {
int numBytes = getSocketChannel().write(request);
if (numBytes == 0 && verbose) {
System.err.println("nothing to write to server");
}
return !request.hasRemaining();
}

//returns true if all data is read, our end of the socket should be closed
public boolean read() throws IOException {
int numBytes = 0;
try {
numBytes = getSocketChannel().read(response);
} catch (IOException e) {
//System.err.println("end of data");
store();
return true;
}
if (numBytes == 0 && verbose) {
System.err.println("nothing to read from server");
}
if (numBytes == -1) {
//System.err.println("end of data");
store();
return true;
}

while (response.remaining() == 0 && numBytes > 0) {
store();
//read again from socket
numBytes = getSocketChannel().read(response);
}

return false;

}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
public abstract int connected_ok();
public abstract int connect_failed();
public abstract int write_ok(boolean eod);
public abstract int read_ok(boolean eod);
}



import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;
import java.net.ConnectException;

/**
* @author: thushara
* @version: Nov 6, 2009
*/
public class ASyncSocketHandler {
private Selector selector;
private HashSet<ConnectionParam> clients = new HashSet<ConnectionParam>();
private int timeout;

public static final int OP_READ = SelectionKey.OP_READ;
public static final int OP_WRITE = SelectionKey.OP_WRITE;
public static final int OP_CLOSE = 128; //leave enough room for possible expansion of SelectionKey enum

public static boolean verbose = false;

public void connect(ConnectionParam connParm) throws IOException {
SocketChannel client = connParm.getSocketChannel();
SelectionKey clientKey = client.register(selector, SelectionKey.OP_CONNECT);
clientKey.attach(connParm);
clients.add(connParm);
}

public void connect_all(ConnectionParam[] connParms) throws IOException {
selector = Selector.open();
for (ConnectionParam connParm : connParms) {
connect(connParm);
}

while (clients.size() > 0) {
if (verbose) {
System.out.println("waiting for sockets to be ready. currently waiting for:");
dumpClients();
}
int numReady = 0;

try {
numReady = selector.select(timeout);
} catch (IOException e) {
if (verbose) {
System.err.println("select failed, unrecoverable!" + e.getMessage());
}
break;
}
if (numReady <= 0) {
if (verbose) {
System.err.println("No sockets ready for " + timeout/1000 + " secs, consider dead remotes, bailing...");
}
break;
}

Set<SelectionKey> keys = selector.selectedKeys(); // can't recover from exception thrown here
Iterator i = keys.iterator();
ConnectionParam connParm = null;

SelectionKey key = null;
while (i.hasNext()) {
try {
key = (SelectionKey)i.next();
i.remove();
SocketChannel channel = (SocketChannel)key.channel();
connParm = (ConnectionParam)key.attachment();
if (key.isConnectable()) {
if (channel.isConnectionPending()) {
boolean ok = false;
try {
ok = channel.finishConnect();
} catch (ConnectException e) {
if (verbose) {
System.err.println("error processing " + connParm.toString());
e.printStackTrace();
}
}

if (!ok) {
if (verbose) {
System.err.println("not connected, giving up");
}
//cleanup
if (key != null) {
try {
key.channel().close();
} catch (IOException x) {}
key.cancel();
clients.remove(key.attachment());
continue;
}
connParm.connect_failed();
} else {
int op = connParm.connected_ok();
key.interestOps(op);
}
}
}
if (key.isWritable()) {
boolean completedWrite = connParm.write();
int op = connParm.write_ok(completedWrite);
if (op == ASyncSocketHandler.OP_CLOSE) {
channel.close();
key.cancel();
clients.remove(connParm);
} else {
key.interestOps(op);
}
}
if (key.isReadable()) {
boolean endOfData = connParm.read();
int op = connParm.read_ok(endOfData);
if (op == ASyncSocketHandler.OP_CLOSE || endOfData) {
channel.close();
key.cancel();
clients.remove(connParm);
} else {
key.interestOps(op);
}
}

} catch (IOException e) {

}
}
}
}

private void dumpClients() {
for (ConnectionParam connParm : clients) {
System.out.println(connParm.toString());
}
}

public void setTimeout(int timeout) {
this.timeout = timeout;
}

}



I'm making the project available here:

4 comments:

vitamin b said...

ePoll 2.25
ePoll provides a Web-based poll written in server side Java.
Main features:
- Voting and results viewing
- Multiple poll categories
- Multiple polling booths
- Polling booth security control
- Allows administrators to set up polls in the system and supply the set of opinions from which the user has to choose
- Allows administrators to configure the system to allow one vote per poll per user or multiple votes per user
- Allows users to register a vote on polls on the site
- Allows users to see the results of a vote broken down into total votes and number of votes for each option
- Allows users to list all polls running on the site

Pankaj Gupta said...

Hi,

Thanks for the good article.

How to write some sample data using write method(in ConnectionParam) through run method in ASyncSocketHandler ?

It will be of great help.

Thanks.

Shyam Prasad said...
This comment has been removed by the author.
Shyam Prasad said...

Good Article.

How to make theASyncEchoClient class for reading as well as writing some data using single or multiple threads?

Thanks