Thursday, November 19, 2009

Java: building an asynchronous web page fetcher


Today I leveraged the earlier epoll() based framework (FishHooks) I built to fetch web pages. FishHooks is protocol-agnostic and as such most of the code in the fetcher has to do with HTTP. Send the proper request, parse the response etc.

There are, as usual, two files. The driver program ASyncWgetClient.java will use FishHooks to start the epoll() loop. WgetParam.java will extend ConnectionParam and implement the state machine peculiar to the HTTP request/response cycle. Among other things, we will handle the 30X redirects here.


import java.util.List;
import java.util.ArrayList;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.CharacterCodingException;

/**
* @author: thushara
* @version: Nov 18, 2009
*/
public class ASyncWgetClient {

public void process() {

class DNSBlob {
String url;
String path;
String host;
int port;
URLWithRetries retryURL;
byte[] inetAddr;

public DNSBlob(String url, String path, String host, int port, URLWithRetries retryURL, byte[] inetAddr) {
this.url = url;
this.path = path;
this.host = host;
this.port = port;
this.retryURL = retryURL;
this.inetAddr = inetAddr;
}
};

//ConnectionParam.verbose = true;
//ASyncSocketHandler.verbose = true;

ASyncSocketHandler hndlr = new ASyncSocketHandler();

String[] urls = {"http://zerohedge.com", "http://moneycentral.msn.com", "http://www.nakedcapitalism.com",
"http://youtube.com", "http://democracynow.org", "http://love.com", "http://anncoulter.com",
"http://peace.org", "http://grunge.com", "http://rockandroll.com", "http://shopping.com",
"http://luck.com", "http://schools.org", "http://whipitthemovie.com",
"http://war.org", "http://benstiller.com", "http://www.oprahwinfrey.com", "http://porche.com",
"http://spanish.com", "http://healingart.com", "http://flash.com"};


List<DNSBlob> dnsList = new ArrayList<DNSBlob>();

for (String url : urls) {
URLWithRetries retryURL = new URLWithRetries(url);
URL urlObj = null;
try {
urlObj = new URL(url);
} catch (MalformedURLException e) {
WgetParam.writeErrorFile(WgetParam.getErrorFileName(url), url+" is not correct, skipping...");
continue;
}
if (urlObj.getProtocol().equals("https")) {
WgetParam.writeErrorFile(WgetParam.getErrorFileName(url), url+" not handling https, skipping...");
continue;
}
String host = urlObj.getHost();
String path = urlObj.getPath();
if (urlObj.getQuery() != null) path += ("?"+urlObj.getQuery());
if (path.length() == 0) path ="/";
int port = urlObj.getDefaultPort();

DNSResolver dnsRes = new DNSResolver(host);
Thread t = new Thread(dnsRes);
t.start();
try {
t.join(1000);
} catch (InterruptedException e) {}
byte[] inetAddr = dnsRes.get();

if (inetAddr == null) {
WgetParam.writeErrorFile(WgetParam.getErrorFileName(url), host + " has no DNS entry");
continue;
}

dnsList.add(new DNSBlob(url, path, host, port, retryURL, inetAddr));
}

ConnectionParam[] params = new WgetParam[urls.length];
for (int i=0; i<dnsList.size(); i++) {
DNSBlob dnsBlob = dnsList.get(i);
String request = "GET "+dnsBlob.path+" HTTP/1.0\r\nHost: "+dnsBlob.host+"\r\nAccept: */*\r\n\r\n";
try {
params[i] = new WgetParam(dnsBlob.inetAddr, dnsBlob.port, request, dnsBlob.retryURL, hndlr);
} catch (CharacterCodingException e) {
WgetParam.writeErrorFile(WgetParam.getErrorFileName(dnsBlob.url), e.getMessage());
//just leave null
}
}

try {
hndlr.connect_all(params);
} catch (Exception e) {
System.err.println("unexpected :" + e.getMessage());
e.printStackTrace();
}
}

public class DNSResolver implements Runnable {
private String domain;
private byte[] inetAddr;

public DNSResolver(String domain) {
this.domain = domain;
}

public void run() {
try {
byte[] addr = InetAddress.getByName(domain).getAddress();
set(addr);
} catch (UnknownHostException e) {

}
}

public synchronized void set(byte[] inetAddr) {
this.inetAddr = inetAddr;
}
public synchronized byte[] get() {
return inetAddr;
}
};

public static void main(String[] args) {
ASyncSocketHandler.verbose = true;
ASyncWgetClient aWget = new ASyncWgetClient();
aWget.process();
}
}




import java.io.*;
import java.nio.charset.CharacterCodingException;
import java.nio.channels.FileChannel;
import java.nio.MappedByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* @author: thushara
* @version: Nov 18, 2009
*/
public class WgetParam extends ConnectionParam {

public static final String OUTPUT_DIR = "/Users/thushara/affinity3/";

private URLWithRetries retryURL;
private ASyncSocketHandler hndlr;

public WgetParam(byte[] ip, int port, String request, URLWithRetries retryURL, ASyncSocketHandler hndlr) throws CharacterCodingException {
super(ip, port, request);
this.retryURL = retryURL;
this.hndlr = hndlr;
}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
// after connecting, we can proceed to send the GET request
public int connected_ok() {
return ASyncSocketHandler.OP_WRITE;
}

public int connect_failed() {
return ASyncSocketHandler.OP_CLOSE;
}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
// param: eod = true iif if the complete request was written to the socket
// if request is not fully written, return signal to write again
// else, we can go on to read, return read singnal.
public int write_ok(boolean eod) {
return eod ? ASyncSocketHandler.OP_READ : ASyncSocketHandler.OP_WRITE;
}

// params: eod = true iif all data was read from the socket, socket has no more data
// if all data is read, this socket won't return any more data so
// we should close the socket, signal such
// else we should continue reading, signal such.
public int read_ok(boolean eod) {
if (eod) {
byte[] resp = getResponse();

String fName = getFileName(retryURL.getOrigURL());
String path = OUTPUT_DIR+fName;
int code = ((int)(resp[9]-48))*100 + (resp[10]-48)*10 + resp[11]-48;
switch(code) {
case 200:
case 403:
case 404:
case 400:
try {
writeFileWithoutHeaders(resp, path);
} catch (IOException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
}
break;
case 301:
case 302:
case 303:
case 305: // Location: contains proxy, check if it is full url or just the proxy server name
case 307:
if (retryURL.hitLimit()) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), "URL attempts too many redirections - giving up");
break;
}
String redirURL = null;
try {
redirURL = getLocation(resp, retryURL.getDomain());
} catch (MalformedURLException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
}
addURL(redirURL);
break;
//case 400:
// writeErrorFile(getErrorFileName(retryURL.getOrigURL()), "bad request: 400");
// break;
case 408: // request timed out, try again
addURL(retryURL.url);
break;
default:
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), "unexpected response code: " + code);
break;
}

}
return eod ? ASyncSocketHandler.OP_CLOSE : ASyncSocketHandler.OP_READ;
}

private void addURL(String redirURL) {
retryURL.nextURL(redirURL);
URL url = null;
try {
url = new URL(retryURL.url);
} catch (MalformedURLException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
return;
}
if (url.getProtocol().equals("https")) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), url+" not handling https, skipping...");
return;
}
String host = url.getHost();
String path = url.getPath();
if (url.getQuery() != null) path += ("?"+url.getQuery());
if (path.length() == 0) path = "/";
int port = url.getDefaultPort();
InetAddress inetAddr = null;
try {
inetAddr = InetAddress.getByName(host);
} catch (UnknownHostException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
return;
}
String request = "GET "+path+" HTTP/1.0\r\nHost: "+host+"\r\nAccept: */*\r\n\r\n";
ConnectionParam connParm = null;
try {
connParm = new WgetParam(inetAddr.getAddress(), port, request, retryURL, hndlr);
} catch (CharacterCodingException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
return;
}
try {
hndlr.connect(connParm);
} catch (ASyncSocketHandler.SelectorNotOpenException e) {}
catch (IOException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
}
}

private static String getFileName(String url) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hs = new byte[16];
md.update(url.getBytes("UTF-8"), 0, url.length());
hs = md.digest();
return String.format("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x.url", hs[0],hs[1],hs[2],hs[3],hs[4],hs[5],hs[6],hs[7],hs[8],hs[9],hs[10],hs[11],hs[12],hs[13],hs[14],hs[15]);
} catch (NoSuchAlgorithmException e) {
System.err.println(e);
e.printStackTrace();
System.exit(-1);
} catch (UnsupportedEncodingException e) {
System.err.println(e);
e.printStackTrace();
System.exit(-1);
}
return null;
}

private static String getLocation(byte[] httpHeader, String domain) {
String location = null;
for (int i=0; i<Integer.MAX_VALUE; i++) {
StringBuffer line = new StringBuffer();
for (char c = (char)httpHeader[i]; c!= '\n'; i++,c=(char)httpHeader[i]) {
line.append(c);
}
if (line.length()==0 || line.length()==1) break; // no Location field
if (line.length()>"Location".length() && line.substring(0,"Location".length()).toLowerCase().equals("location")) {
location = line.substring("Location".length()+2, line.length()-1);
break;
}
}
return location.toLowerCase().startsWith("http://") || location.toLowerCase().startsWith("https://") ? location : domain + (location.substring(0,1).equals("/") ? location : "/"+location);
}

private void writeFileWithoutHeaders(byte[] bytes, String path) throws IOException {
File temp = File.createTempFile("page",null);
FileOutputStream f = new FileOutputStream(temp);
f.write(bytes);
f.close();

FileChannel chan = new FileInputStream(temp).getChannel();
MappedByteBuffer buf = chan.map(FileChannel.MapMode.READ_ONLY, 0, chan.size());
int htmlOffset = 0;
for (int i=0; i<Integer.MAX_VALUE; i++) {
int j=0;
for (char c = (char)buf.get(i); c!= '\n'; i++,j++,c=(char)buf.get(i));
if (j==0 || (j==1 && buf.get(i-1) == '\r')) {
htmlOffset = i+1;
break;
}
}
FileChannel chan2 = new FileOutputStream(path).getChannel();
chan.transferTo(htmlOffset, chan.size()-htmlOffset, chan2);
chan.close();
chan2.close();
}

//two convenience methods to note errors, used from both WgetParam class and
//ASyncWgetClient class.
public static String getErrorFileName(String url) {
return getFileName(url).substring(0,getFileName(url).length()-3)+"bad";
}

public static void writeErrorFile(String fName, String error) {
try {
FileWriter fstream = new FileWriter(OUTPUT_DIR+fName);
BufferedWriter out = new BufferedWriter(fstream);
out.write(error);
out.close();
} catch (Exception e) {//Catch exception if any
System.err.println("Error writing error file: " + e.getMessage());
}
}


}



You can see there is a lot more code than the simpler client talking to time of day servers. Most of the code is to do with the networking protocol. You still don't deal with sockets, channels or byte buffers.

Tuesday, November 10, 2009

Java : A callback API for epoll(), building on top of Java NIO


I just finished the first draft of the Java callback API that allows you to do asynchronous socket communication using the scalable epoll() mechanism of Linux. This is built on top of Java NIO, so it will take advantage of the best underlying network mechanism the OS has to offer. In Linux 2.6 + /FreeBSD this will likely be epoll().

Use the API if you need to get data from a large number of hosts quickly without the overhead of a thread for each host/connection. Having a thread per connection has serious issues as the number of simultaneous connections increases, specially if memory is scarce.

The Java NIO api can certainly be used for this purpose. But using that API is somewhat difficult and there are various caveats you have to guard against. You need to understand the Selector class, SelectionKey class, InterestOps class, SocketChannel class and their interplay. You need to know the logic for canceling SelectionKeys and setting InterestOps appropriately. And you need to understand the cryptic ByteBuffer class and its variants, possibly how the ByteArrayOutputStream works as well. You need to figure out how to use these classes to store the data separately for each connection, keeping in mind that the data will likely arrive mixed. You also should not call select if all sockets have been completely read, or select() will block - which means you will have to keep track of pending hosts.

The callback framework I developed aims to relieve you of all these details. It lets you focus on the flow of calls between the client and each host along with the application logic once you get data from the hosts.

Usage:
Extend the abstract class ConnectionParam. (Let's name it ConnectionParamImpl)
Create a ConnectionParamImpl object for each connection and stuff these into an array.
Call ASyncSocketHandler.connect_all(array_of_connections) - the argument is the above array.

Since we extended the ConnectionParam class, we need to implement some methods. These are simply the callback methods that tell us that data has either been written to the network, or read from the network. This is where we get a chance to examine the data and do what we need to do from the application's point of view.

    // returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
public abstract int connected_ok();
public abstract int connect_failed();
public abstract int write_ok(boolean eod);
public abstract int read_ok(boolean eod);



Each of the over-ridden methods needs to return an int signaling the next network operation you are interested in. The available options are:

ASyncSocketHandler.OP_READ
ASyncSocketHandler.OP_WRITE
ASyncSocketHandler.OP_CLOSE



This is a simple usage scenario that gets output from a bunch of known time of day servers.

There are two objects you need to implement:

ConnectionParamImpl class and ASyncEchoClient class.
ConnectionParamImpl represents a connection to a single host.
ASyncEchoClient is the application entry point.

import java.io.UnsupportedEncodingException;

/**
* @author: thushara
* @version: Nov 8, 2009
*/
public class ConnectionParamImpl extends ConnectionParam {

public ConnectionParamImpl(byte[] ip, int port) {
super(ip, port);
}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
// after connecting, we can proceed to read
public int connected_ok() {
return ASyncSocketHandler.OP_READ;
}

public int connect_failed() {
return ASyncSocketHandler.OP_CLOSE;
}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
// param: eod = true iif if the complete request was written to the socket
// if request is not fully written, return signal to write again
// else, we can go on to read, return read singnal.
public int write_ok(boolean eod) {
return eod ? ASyncSocketHandler.OP_READ : ASyncSocketHandler.OP_WRITE;
}

// params: eod = true iif all data was read from the socket, socket has no more data
// if all data is read, this socket won't return any more data so
// we should close the socket, signal such
// else we should continue reading, signal such.
public int read_ok(boolean eod) {
if (eod) {
byte[] resp = getResponse();
try {
System.out.println(new String(resp,"UTF-8"));
} catch (UnsupportedEncodingException e) {
System.out.println("bad chars - non utf-8");
}
}
return eod ? ASyncSocketHandler.OP_CLOSE : ASyncSocketHandler.OP_READ;
}

}



/**
* @author: thushara
* @version: Nov 8, 2009
*/
public class ASyncEchoClient {
public static void main(String[] args) {

ConnectionParam.verbose = true;
ASyncSocketHandler.verbose = true;

ASyncSocketHandler hndlr = new ASyncSocketHandler();
int port = 13;

String[] ips = {"64.90.182.55", "129.6.15.28", "129.6.15.29", "206.246.118.250", "64.236.96.53", "68.216.79.113", "208.66.175.36",
"173.14.47.149", "64.113.32.5", "132.163.4.101", "132.163.4.102", "132.163.4.103", "192.43.244.18", "128.138.140.44",
"128.138.188.172", "198.60.73.8", "131.107.13.100", "207.200.81.113", "69.25.96.13", "64.125.78.85", "64.147.116.229"};
ConnectionParam[] params = new ConnectionParamImpl[ips.length];

for (int j=0; j<ips.length; j++) {
String ip = ips[j];
String[] sb = ip.split("\\.");
byte[] bytes = new byte[sb.length];
for (int i=0; i<sb.length; i++) {
bytes[i] = (byte)Integer.parseInt(sb[i]);
}
params[j] = new ConnectionParamImpl(bytes, port);
}

try {
hndlr.connect_all(params);
} catch (Exception e) {
System.err.println("unexpected :" + e.getMessage());
e.printStackTrace();
}
}
}


Notice that you don't see a single reference to a socket or a byte buffer here. All the communication details are wrapped in ASyncSocketHandler.java and ConnectionParam.java.

This is the output you should see, basically the time of day as reported by each host:


55145 09-11-10 23:13:25 00 0 0 434.7 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 401.7 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 398.1 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 402.8 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 383.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 386.1 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 348.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 347.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 348.2 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 347.9 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 339.2 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 316.3 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 306.9 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 310.7 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 313.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 298.7 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 295.4 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 274.6 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 268.1 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 268.0 UTC(NIST) *


55145 09-11-10 23:13:25 00 0 0 275.9 UTC(NIST) *


This is the framework implementation. There are two files.

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CharacterCodingException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.net.InetSocketAddress;

/**
* @author: thushara
* @version: Nov 6, 2009
*/
public abstract class ConnectionParam {
private byte[] ip;
private int port;
private ByteBuffer request;
private ByteBuffer response;
private int respBufSize = 1024;
private ByteArrayOutputStream outStrm;
private SocketChannel client;

public static boolean verbose = false;

public ConnectionParam(byte[] ip, int port) {
this.ip = ip;
this.port = port;
response = ByteBuffer.allocate(respBufSize);
outStrm = new ByteArrayOutputStream();
}

public ConnectionParam(byte[] ip, int port, String request) throws CharacterCodingException {
this(ip,port);
setRequest(request);
}

public ConnectionParam(byte[] ip, int port, String request, int respBufSize) throws CharacterCodingException {
this(ip,port);
setRequest(request);
this.respBufSize = respBufSize;
}

public ConnectionParam(byte[] ip, int port, int respBufSize) {
this(ip,port);
this.respBufSize = respBufSize;
}

public void setRequest(String request) throws CharacterCodingException {
Charset charset = Charset.forName( "UTF-8" );
CharsetEncoder encoder = charset.newEncoder();
this.request = encoder.encode(CharBuffer.wrap(request));
}

public byte[] getResponse() {
int respSize = outStrm.size() + response.position();
byte[] out = new byte[respSize];
System.arraycopy(outStrm.toByteArray(),0,out,0,outStrm.size());

int prevPos = response.position();
response.flip();
response.get(out,outStrm.size(),response.limit());
response.position(prevPos);
response.limit(response.capacity());

return out;
}

private InetAddress getAddr() {
try {
return InetAddress.getByAddress(ip);
} catch (UnknownHostException e) {
return null;
}
}

private void store() throws IOException {
response.flip();
byte[] bytes = new byte[response.limit()];
response.get(bytes);
outStrm.write(bytes, 0, bytes.length);
response.clear(); //initialize for reading from socket again
}


public SocketChannel getSocketChannel() throws IOException {
if (client != null) return client;

client = SocketChannel.open();
client.configureBlocking(false);
client.connect(new InetSocketAddress(getAddr(),port));
return client;
}

public String toString() {
InetAddress addr = null;
try {
addr = InetAddress.getByAddress(ip);
} catch (UnknownHostException e) {
return "";
}
return addr.toString() + ":" + port;
//return String.format("%d.%d.%d.%d:%d", ip[0], ip[1], ip[2], ip[3], port);
}

//returns true if all data is written to the socket
public boolean write() throws IOException {
int numBytes = getSocketChannel().write(request);
if (numBytes == 0 && verbose) {
System.err.println("nothing to write to server");
}
return !request.hasRemaining();
}

//returns true if all data is read, our end of the socket should be closed
public boolean read() throws IOException {
int numBytes = 0;
try {
numBytes = getSocketChannel().read(response);
} catch (IOException e) {
//System.err.println("end of data");
store();
return true;
}
if (numBytes == 0 && verbose) {
System.err.println("nothing to read from server");
}
if (numBytes == -1) {
//System.err.println("end of data");
store();
return true;
}

while (response.remaining() == 0 && numBytes > 0) {
store();
//read again from socket
numBytes = getSocketChannel().read(response);
}

return false;

}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
public abstract int connected_ok();
public abstract int connect_failed();
public abstract int write_ok(boolean eod);
public abstract int read_ok(boolean eod);
}



import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.SelectionKey;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;
import java.net.ConnectException;

/**
* @author: thushara
* @version: Nov 6, 2009
*/
public class ASyncSocketHandler {
private Selector selector;
private HashSet<ConnectionParam> clients = new HashSet<ConnectionParam>();
private int timeout;

public static final int OP_READ = SelectionKey.OP_READ;
public static final int OP_WRITE = SelectionKey.OP_WRITE;
public static final int OP_CLOSE = 128; //leave enough room for possible expansion of SelectionKey enum

public static boolean verbose = false;

public void connect(ConnectionParam connParm) throws IOException {
SocketChannel client = connParm.getSocketChannel();
SelectionKey clientKey = client.register(selector, SelectionKey.OP_CONNECT);
clientKey.attach(connParm);
clients.add(connParm);
}

public void connect_all(ConnectionParam[] connParms) throws IOException {
selector = Selector.open();
for (ConnectionParam connParm : connParms) {
connect(connParm);
}

while (clients.size() > 0) {
if (verbose) {
System.out.println("waiting for sockets to be ready. currently waiting for:");
dumpClients();
}
int numReady = 0;

try {
numReady = selector.select(timeout);
} catch (IOException e) {
if (verbose) {
System.err.println("select failed, unrecoverable!" + e.getMessage());
}
break;
}
if (numReady <= 0) {
if (verbose) {
System.err.println("No sockets ready for " + timeout/1000 + " secs, consider dead remotes, bailing...");
}
break;
}

Set<SelectionKey> keys = selector.selectedKeys(); // can't recover from exception thrown here
Iterator i = keys.iterator();
ConnectionParam connParm = null;

SelectionKey key = null;
while (i.hasNext()) {
try {
key = (SelectionKey)i.next();
i.remove();
SocketChannel channel = (SocketChannel)key.channel();
connParm = (ConnectionParam)key.attachment();
if (key.isConnectable()) {
if (channel.isConnectionPending()) {
boolean ok = false;
try {
ok = channel.finishConnect();
} catch (ConnectException e) {
if (verbose) {
System.err.println("error processing " + connParm.toString());
e.printStackTrace();
}
}

if (!ok) {
if (verbose) {
System.err.println("not connected, giving up");
}
//cleanup
if (key != null) {
try {
key.channel().close();
} catch (IOException x) {}
key.cancel();
clients.remove(key.attachment());
continue;
}
connParm.connect_failed();
} else {
int op = connParm.connected_ok();
key.interestOps(op);
}
}
}
if (key.isWritable()) {
boolean completedWrite = connParm.write();
int op = connParm.write_ok(completedWrite);
if (op == ASyncSocketHandler.OP_CLOSE) {
channel.close();
key.cancel();
clients.remove(connParm);
} else {
key.interestOps(op);
}
}
if (key.isReadable()) {
boolean endOfData = connParm.read();
int op = connParm.read_ok(endOfData);
if (op == ASyncSocketHandler.OP_CLOSE || endOfData) {
channel.close();
key.cancel();
clients.remove(connParm);
} else {
key.interestOps(op);
}
}

} catch (IOException e) {

}
}
}
}

private void dumpClients() {
for (ConnectionParam connParm : clients) {
System.out.println(connParm.toString());
}
}

public void setTimeout(int timeout) {
this.timeout = timeout;
}

}



I'm making the project available here:

Monday, November 09, 2009

Java : fetching web pages using NIO - timeouts


I'm developing a call back framework on top of Java NIO, that will allow for easier implementations of asynchronous page downloads.

As you start adding sockets to a single selector, you get into request time-out issues. This happens because you normally add all the sockets to the selector and then wait for sockets that are ready. Some sockets, after waiting for a few seconds for the request, will timeout at the remote end.

The HTTP response code for this is 408 - it is easy to handle this by removing the SelectionKey from the selector, and adding the socket back to the selector. Since, the application will now be dealing with fewer sockets, the next time, the socket will most probably not time out.

The latency is mostly in domain name resolution. I will next try doing name resolution totally outside socket registration, so I don't eat into the connection time with the slow DNS lookup calls. This should reduce the number of 408 responses, and I should be able to add more sockets to the selector.

Wednesday, November 04, 2009

Resolving domain names quickly with Java

Domain name resolution can take upward of 5 seconds depending on your ISP. Generally, when the resolution works, the DNS response comes back quickly, within a second. Any DNS entry not discovered within a second is likely to be dead.

Java provides InetAddress.getByName(domain) that does the DNS resolution for the given domain. There is no timeout you can specify to this function.

In the interest of providing a responsive user interface, as well as improve performance on background tasks, you can hack a timeout using a Thread. Feel free to re-use this code:

    public class DNSResolver implements Runnable {
private String domain;
private InetAddress inetAddr;

public DNSResolver(String domain) {
this.domain = domain;
}

public void run() {
try {
InetAddress addr = InetAddress.getByName(domain);
set(addr);
} catch (UnknownHostException e) {

}
}

public synchronized void set(InetAddress inetAddr) {
this.inetAddr = inetAddr;
}
public synchronized InetAddress get() {
return inetAddr;
}
}


Use this class as follows:

                DNSResolver dnsRes = new DNSResolver(host);
Thread t = new Thread(dnsRes);
t.start();
t.join(1000);
InetAddress inetAddr = dnsRes.get();



Basically, we start a thread that executes the blocking call to get the domain name. We wait for the thread to exit under 1 second (1000 ms). If the DNS resolution happens under a second, the thread will exit. If not, control will return to the main thread due to the 1000ms timeout. The thread will keep running, and eventually exit. But the main program has stopped caring at that point. It (main thread) will continue after 1 second considering the domain name to be unresolved.

Caveat:
Note that if you resolve a large number of domains that are unreachable, soon you will have lots of threads taking up memory and you could run out of JVM heap. So keep that in mind as you use this class.

Possible usage scenario:
If you are doing socket communication using nio, and you need to resolve DNS for each socket connection attached to the selector, it is important you quickly timeout DNS, as otherwise some sockets might get closed at the remote end. This is specially true for HTTP connections, if you connect to a HTTP server and do not send a request for a few seconds, the server will close the remote end of the socket.