Thursday, November 19, 2009

Java: building an asynchronous web page fetcher


Today I leveraged the earlier epoll() based framework (FishHooks) I built to fetch web pages. FishHooks is protocol-agnostic and as such most of the code in the fetcher has to do with HTTP. Send the proper request, parse the response etc.

There are, as usual, two files. The driver program ASyncWgetClient.java will use FishHooks to start the epoll() loop. WgetParam.java will extend ConnectionParam and implement the state machine peculiar to the HTTP request/response cycle. Among other things, we will handle the 30X redirects here.


import java.util.List;
import java.util.ArrayList;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.charset.CharacterCodingException;

/**
* @author: thushara
* @version: Nov 18, 2009
*/
public class ASyncWgetClient {

public void process() {

class DNSBlob {
String url;
String path;
String host;
int port;
URLWithRetries retryURL;
byte[] inetAddr;

public DNSBlob(String url, String path, String host, int port, URLWithRetries retryURL, byte[] inetAddr) {
this.url = url;
this.path = path;
this.host = host;
this.port = port;
this.retryURL = retryURL;
this.inetAddr = inetAddr;
}
};

//ConnectionParam.verbose = true;
//ASyncSocketHandler.verbose = true;

ASyncSocketHandler hndlr = new ASyncSocketHandler();

String[] urls = {"http://zerohedge.com", "http://moneycentral.msn.com", "http://www.nakedcapitalism.com",
"http://youtube.com", "http://democracynow.org", "http://love.com", "http://anncoulter.com",
"http://peace.org", "http://grunge.com", "http://rockandroll.com", "http://shopping.com",
"http://luck.com", "http://schools.org", "http://whipitthemovie.com",
"http://war.org", "http://benstiller.com", "http://www.oprahwinfrey.com", "http://porche.com",
"http://spanish.com", "http://healingart.com", "http://flash.com"};


List<DNSBlob> dnsList = new ArrayList<DNSBlob>();

for (String url : urls) {
URLWithRetries retryURL = new URLWithRetries(url);
URL urlObj = null;
try {
urlObj = new URL(url);
} catch (MalformedURLException e) {
WgetParam.writeErrorFile(WgetParam.getErrorFileName(url), url+" is not correct, skipping...");
continue;
}
if (urlObj.getProtocol().equals("https")) {
WgetParam.writeErrorFile(WgetParam.getErrorFileName(url), url+" not handling https, skipping...");
continue;
}
String host = urlObj.getHost();
String path = urlObj.getPath();
if (urlObj.getQuery() != null) path += ("?"+urlObj.getQuery());
if (path.length() == 0) path ="/";
int port = urlObj.getDefaultPort();

DNSResolver dnsRes = new DNSResolver(host);
Thread t = new Thread(dnsRes);
t.start();
try {
t.join(1000);
} catch (InterruptedException e) {}
byte[] inetAddr = dnsRes.get();

if (inetAddr == null) {
WgetParam.writeErrorFile(WgetParam.getErrorFileName(url), host + " has no DNS entry");
continue;
}

dnsList.add(new DNSBlob(url, path, host, port, retryURL, inetAddr));
}

ConnectionParam[] params = new WgetParam[urls.length];
for (int i=0; i<dnsList.size(); i++) {
DNSBlob dnsBlob = dnsList.get(i);
String request = "GET "+dnsBlob.path+" HTTP/1.0\r\nHost: "+dnsBlob.host+"\r\nAccept: */*\r\n\r\n";
try {
params[i] = new WgetParam(dnsBlob.inetAddr, dnsBlob.port, request, dnsBlob.retryURL, hndlr);
} catch (CharacterCodingException e) {
WgetParam.writeErrorFile(WgetParam.getErrorFileName(dnsBlob.url), e.getMessage());
//just leave null
}
}

try {
hndlr.connect_all(params);
} catch (Exception e) {
System.err.println("unexpected :" + e.getMessage());
e.printStackTrace();
}
}

public class DNSResolver implements Runnable {
private String domain;
private byte[] inetAddr;

public DNSResolver(String domain) {
this.domain = domain;
}

public void run() {
try {
byte[] addr = InetAddress.getByName(domain).getAddress();
set(addr);
} catch (UnknownHostException e) {

}
}

public synchronized void set(byte[] inetAddr) {
this.inetAddr = inetAddr;
}
public synchronized byte[] get() {
return inetAddr;
}
};

public static void main(String[] args) {
ASyncSocketHandler.verbose = true;
ASyncWgetClient aWget = new ASyncWgetClient();
aWget.process();
}
}




import java.io.*;
import java.nio.charset.CharacterCodingException;
import java.nio.channels.FileChannel;
import java.nio.MappedByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* @author: thushara
* @version: Nov 18, 2009
*/
public class WgetParam extends ConnectionParam {

public static final String OUTPUT_DIR = "/Users/thushara/affinity3/";

private URLWithRetries retryURL;
private ASyncSocketHandler hndlr;

public WgetParam(byte[] ip, int port, String request, URLWithRetries retryURL, ASyncSocketHandler hndlr) throws CharacterCodingException {
super(ip, port, request);
this.retryURL = retryURL;
this.hndlr = hndlr;
}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
// after connecting, we can proceed to send the GET request
public int connected_ok() {
return ASyncSocketHandler.OP_WRITE;
}

public int connect_failed() {
return ASyncSocketHandler.OP_CLOSE;
}

// returns the next op we're interested in: OP_READ / OP_WRITE / OP_CLOSE
// param: eod = true iif if the complete request was written to the socket
// if request is not fully written, return signal to write again
// else, we can go on to read, return read singnal.
public int write_ok(boolean eod) {
return eod ? ASyncSocketHandler.OP_READ : ASyncSocketHandler.OP_WRITE;
}

// params: eod = true iif all data was read from the socket, socket has no more data
// if all data is read, this socket won't return any more data so
// we should close the socket, signal such
// else we should continue reading, signal such.
public int read_ok(boolean eod) {
if (eod) {
byte[] resp = getResponse();

String fName = getFileName(retryURL.getOrigURL());
String path = OUTPUT_DIR+fName;
int code = ((int)(resp[9]-48))*100 + (resp[10]-48)*10 + resp[11]-48;
switch(code) {
case 200:
case 403:
case 404:
case 400:
try {
writeFileWithoutHeaders(resp, path);
} catch (IOException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
}
break;
case 301:
case 302:
case 303:
case 305: // Location: contains proxy, check if it is full url or just the proxy server name
case 307:
if (retryURL.hitLimit()) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), "URL attempts too many redirections - giving up");
break;
}
String redirURL = null;
try {
redirURL = getLocation(resp, retryURL.getDomain());
} catch (MalformedURLException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
}
addURL(redirURL);
break;
//case 400:
// writeErrorFile(getErrorFileName(retryURL.getOrigURL()), "bad request: 400");
// break;
case 408: // request timed out, try again
addURL(retryURL.url);
break;
default:
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), "unexpected response code: " + code);
break;
}

}
return eod ? ASyncSocketHandler.OP_CLOSE : ASyncSocketHandler.OP_READ;
}

private void addURL(String redirURL) {
retryURL.nextURL(redirURL);
URL url = null;
try {
url = new URL(retryURL.url);
} catch (MalformedURLException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
return;
}
if (url.getProtocol().equals("https")) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), url+" not handling https, skipping...");
return;
}
String host = url.getHost();
String path = url.getPath();
if (url.getQuery() != null) path += ("?"+url.getQuery());
if (path.length() == 0) path = "/";
int port = url.getDefaultPort();
InetAddress inetAddr = null;
try {
inetAddr = InetAddress.getByName(host);
} catch (UnknownHostException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
return;
}
String request = "GET "+path+" HTTP/1.0\r\nHost: "+host+"\r\nAccept: */*\r\n\r\n";
ConnectionParam connParm = null;
try {
connParm = new WgetParam(inetAddr.getAddress(), port, request, retryURL, hndlr);
} catch (CharacterCodingException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
return;
}
try {
hndlr.connect(connParm);
} catch (ASyncSocketHandler.SelectorNotOpenException e) {}
catch (IOException e) {
writeErrorFile(getErrorFileName(retryURL.getOrigURL()), e.getMessage());
}
}

private static String getFileName(String url) {
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] hs = new byte[16];
md.update(url.getBytes("UTF-8"), 0, url.length());
hs = md.digest();
return String.format("%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x.url", hs[0],hs[1],hs[2],hs[3],hs[4],hs[5],hs[6],hs[7],hs[8],hs[9],hs[10],hs[11],hs[12],hs[13],hs[14],hs[15]);
} catch (NoSuchAlgorithmException e) {
System.err.println(e);
e.printStackTrace();
System.exit(-1);
} catch (UnsupportedEncodingException e) {
System.err.println(e);
e.printStackTrace();
System.exit(-1);
}
return null;
}

private static String getLocation(byte[] httpHeader, String domain) {
String location = null;
for (int i=0; i<Integer.MAX_VALUE; i++) {
StringBuffer line = new StringBuffer();
for (char c = (char)httpHeader[i]; c!= '\n'; i++,c=(char)httpHeader[i]) {
line.append(c);
}
if (line.length()==0 || line.length()==1) break; // no Location field
if (line.length()>"Location".length() && line.substring(0,"Location".length()).toLowerCase().equals("location")) {
location = line.substring("Location".length()+2, line.length()-1);
break;
}
}
return location.toLowerCase().startsWith("http://") || location.toLowerCase().startsWith("https://") ? location : domain + (location.substring(0,1).equals("/") ? location : "/"+location);
}

private void writeFileWithoutHeaders(byte[] bytes, String path) throws IOException {
File temp = File.createTempFile("page",null);
FileOutputStream f = new FileOutputStream(temp);
f.write(bytes);
f.close();

FileChannel chan = new FileInputStream(temp).getChannel();
MappedByteBuffer buf = chan.map(FileChannel.MapMode.READ_ONLY, 0, chan.size());
int htmlOffset = 0;
for (int i=0; i<Integer.MAX_VALUE; i++) {
int j=0;
for (char c = (char)buf.get(i); c!= '\n'; i++,j++,c=(char)buf.get(i));
if (j==0 || (j==1 && buf.get(i-1) == '\r')) {
htmlOffset = i+1;
break;
}
}
FileChannel chan2 = new FileOutputStream(path).getChannel();
chan.transferTo(htmlOffset, chan.size()-htmlOffset, chan2);
chan.close();
chan2.close();
}

//two convenience methods to note errors, used from both WgetParam class and
//ASyncWgetClient class.
public static String getErrorFileName(String url) {
return getFileName(url).substring(0,getFileName(url).length()-3)+"bad";
}

public static void writeErrorFile(String fName, String error) {
try {
FileWriter fstream = new FileWriter(OUTPUT_DIR+fName);
BufferedWriter out = new BufferedWriter(fstream);
out.write(error);
out.close();
} catch (Exception e) {//Catch exception if any
System.err.println("Error writing error file: " + e.getMessage());
}
}


}



You can see there is a lot more code than the simpler client talking to time of day servers. Most of the code is to do with the networking protocol. You still don't deal with sockets, channels or byte buffers.

1 comment:

vitamin b said...

I am looking for a developer who can develop/personalize applications for a community based website. It must be coded in Jana on BEA WebLogic platform and Oracle DB. The specific applications we are looking for are a CMS system, Community.