Friday, June 05, 2009

Hadoop - reading large lines (several MB) is slow


I ran into a performance issue running a Hadoop map/reduce job on an input that at times contained lines as long as 200MB. The issue was in org.apache.hadoop.util.LineReader.

LineReader uses org.apache.hadoop.io.Text to store potentially large lines of text. Unfortunately Text class does not behave well for large text.

Here is the yourKit profile of a simple block of code using Text class:



Here is a profile when Text is replaced with ByteArrayOutputStream:



Notice the Text.append version took 10 times longer to run.

I could get my map/reduce task that initially took over 20 minutes and crashed (as hadoop TaskTracker was timing out on child tasks that took too long) to work under 30s, with this simple change to LineReader:

  public int readLine(Text str, int maxLineLength,
int maxBytesToConsume) throws IOException {
str.clear();
boolean hadFinalNewline = false;
boolean hadFinalReturn = false;
boolean hitEndOfFile = false;
int startPosn = bufferPosn;
long bytesConsumed = 0;
ByteArrayOutputStream os = new ByteArrayOutputStream();
outerLoop: while (true) {
if (bufferPosn >= bufferLength) {
if (!backfill()) {
hitEndOfFile = true;
break;
}
}
startPosn = bufferPosn;
for(; bufferPosn < bufferLength; ++bufferPosn) {
switch (buffer[bufferPosn]) {
case '\n':
hadFinalNewline = true;
bufferPosn += 1;
break outerLoop;
case '\r':
if (hadFinalReturn) {
// leave this \r in the stream, so we'll get it next time
break outerLoop;
}
hadFinalReturn = true;
break;
default:
if (hadFinalReturn) {
break outerLoop;
}
}
}
bytesConsumed += bufferPosn - startPosn;
int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
length = (int)Math.min(length, maxLineLength - os.size());
if (length >= 0) {
os.write(buffer, startPosn, length);
LOG.info("os.size= " + os.size() + " just wrote from " + startPosn + " to " + length + " bytes");
}
if (bytesConsumed >= maxBytesToConsume) {
str.set(os.toByteArray());
return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
}
}
LOG.info("finished reading line");
int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
if (!hitEndOfFile) {
bytesConsumed += bufferPosn - startPosn;
int length = bufferPosn - startPosn - newlineLength;
length = (int)Math.min(length, maxLineLength - os.size());
if (length > 0) {
os.write(buffer, startPosn, length);
}
}

str.set(os.toByteArray());
return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
}

No comments: