Except, it is not strictly correct. The output of combine() can be fed back into the combine() function for repeated aggregation. In general, this does not cause a problem, but one can write incorrect code, if one was not aware of this detail. An example would be a simple counting program.
This program counts the terms present in the given documents. In the reducer, the sum is incremented by 1 for each element in [c1, c2, ...] because, we know that all counts in this array are "1"s. That is because, the mapper always emits "1"s.
Now let's add a combiner to aggregate the terms locally.
It is identical to the reducer. The sum is incremented by 1 for each "1" element in the array. But now, we see that we must change the reducer to not sum "1"s, as the combiner would be doing an aggregation first. So we change the reducer to this:
The thinking is that the array input into the combiner could never have values other than "1"s. But this is an incorrect assumption because, as we said, the output of a combiner, that would have produced aggregated values, other than "1"s, can be again input to the combine() function. Thus the correct combine() is as follows:
class Mapper
method Map(docid id, doc d)
for all term t in doc d do
Emit(term t, count 1)
class Reducer
method Reduce(term t, counts [c1, c2,...])
sum = 0
for all count c in [c1, c2,...] do
sum = sum + 1
Emit(term t, count sum)
Now let's add a combiner to aggregate the terms locally.
class Combiner
method Combine(term t, [c1, c2,...])
sum = 0
for all count c in [c1, c2,...] do
sum = sum + 1
Emit(term t, count sum)
It is identical to the reducer. The sum is incremented by 1 for each "1" element in the array. But now, we see that we must change the reducer to not sum "1"s, as the combiner would be doing an aggregation first. So we change the reducer to this:
class Reducer
method Reduce(term t, counts [c1, c2,...])
sum = 0
for all count c in [c1, c2,...] do
sum = sum + c
Emit(term t, count sum)
The thinking is that the array input into the combiner could never have values other than "1"s. But this is an incorrect assumption because, as we said, the output of a combiner, that would have produced aggregated values, other than "1"s, can be again input to the combine() function. Thus the correct combine() is as follows:
class Combiner
method Combine(term t, [c1, c2,...])
sum = 0
for all count c in [c1, c2,...] do
sum = sum + c
Emit(term t, count sum)
In fact, this is identical to the reduce() function.
We can peek under the hood in Hadoop source code to see where this two step combining happens. Inside java.org.apache.hadoop.mapred.MapTask, a flush() method has two function calls of relevance.
sortAndSpill()
mergeParts()
sortAndSpill() is actually called early on by a spill thread as well. The flush() makes sure that any remaining data is properly spilled. flush() then interrupts the spill thread, waits for it to end, and then calls mergeParts().
sortAndSpill() is the section of code that runs as the mapper is writing the intermediate values into spill files.
Inside sortAndSpill() :
if (spstart != spindex) {
combineCollector.setWriter(writer);
RawKeyValueIterator kvIter =
new MRResultIterator(spstart, spindex);
combinerRunner.combine(kvIter, combineCollector);
}
This is where the combine() function is called first. But when data volume is high, Hadoop is unable to wait for all output from the mappers, before spilling the data to disk. When a threshold is reached, the buffers are spilled into a spill file. So it is quite possible, that one key gets spilled into two spill files. And if this happens, Hadoop can do yet another aggregation on running the data in spill files through the combiner. And that is partly what the mergeParts() function does:
if (combinerRunner == null || numSpills < minSpillsForCombine) {
Merger.writeFile(kvIter, writer, reporter, job);
} else {
combineCollector.setWriter(writer);
combinerRunner.combine(kvIter, combineCollector);
}
This is how combine() can be called twice, or more times, for the same key. So even if the mapper always emits a "1", the combiner could get values much larger than "1". It is always good practice to not assume the values coming into the combiner based on what is output from map().