MapReduce, with inspiration from functional programming

Posted by Jon
on Thursday, August 14

MapReduce is the architecture that Google uses to do things like index the web and calculate PageRank. It’s a somewhat popular topic for developers and bloggers, for two reasons. First, because Google uses it to such dramatic effect, and it’s easy to think that it must be the greatest and most powerful way to handle distributed processing. Second, it is a little hard to understand at first, which means that there is always a market for “intro to MapReduce” blog posts.

The thing is, there is nothing magical about MapReduce. It is fairly simple on the surface, once you understand a few basic concepts (like map and reduce, though MapReduce != map + reduce, as we’ll see soon). It also isn’t the “best” approach to distributed processing, because there are so many types of problems that need distributed processing, and MapReduce is only appropriate for a small subset.

So this post isn’t about DIY MapReduce. Instead, it’s about understanding MapReduce. Specifically, understanding it from a languages standpoint, with reference to map and reduce, rather than understanding it from a systems standpoint.

Iterators and MapReduce

If you understand map and reduce, you’re about a third of the way to understanding MapReduce. But it is also important to note that MapReduce doesn’t even strictly need map or reduce functions, and is implemented at Google in C++ (not exactly a functional language). So map and reduce are more the conceptual foundation of MapReduce, rather than the underlying code.

But still, the MapReduce framework gets its name from these higher-order functions, and the basic pattern is simple:

  1. Split a large problem into smaller chunks, and perform a function on each chunk (map)
  2. Aggregate the output (reduce)

Because map just applies a function to an element in an array, with no side effects, order doesn’t matter. You could run map backwards or forwards, and the result would be the same. Therefore, map operations can easily be parallelized over different CPU cores, or across multiple machines. (This is one of the advantages we get from greater abstraction. We could replace map with an each iterator, reduce, or even a for or while loop, but by using map, we know immediately that parallelization is possible.)

MapReduce also parallelizes its reduce stage, even though reduce is not inherently parallelizable. It does this not by parallelizing a single reduce, but by distributing each reduction to a different machine. So while the map stage is a single map, distributed over several computers, the reduce stage is multiple reduces, each operating on a single machine, and each independent of the reduction happening on the next machine.

An example: from local reduce to MapReduce

What kind of problem can be solved with MapReduce? Counting words is a basic example. Let’s say you want to count the number of occurrences of each word in War and Peace. You could do this simply in terms of inject, returning a hash of key-value pairs that list a word and its number of instances, like {"war" => 225, "peace" => 341}.

1
2
3
4
5
words = File.open("/path/to/war_and_peace.txt", "r").to_a.join(" ").split(" ")
word_counts = words.reduce(Hash.new(0)) do |results, word|
  results[word.to_sym] += 1
  results
end

This code is not parallelizable. Fortunately, it only takes Ruby about a second to count the instances of each word in War and Peace, which means that distributed MapReduce is only needed for much larger problems. But what if we wanted word counts of every book in Project Gutenberg? Or of every page on the entire internet? Or what if our calculation function took longer? There are 574,780 total words in the English translation of War and Peace that I’m using; if each word took a second to process, due to a network call or a complex calculation, it would take 6.5 days to process the book. Proust would take three weeks. Yikes!

That’s where MapReduce comes in. Instead of processing the entire list with a single reduce, imagine splitting the text of War and Peace into 200 even chunks. These chunks would then be mapped to 200 different servers, with each server doing its own parallel word counting, like this:

1
2
3
word_chunks.map do |chunk|
  assign_to_server(count_words(chunk)) #[{"the" => 1}, {"cat" => 1}, {"the" => 1], {"dog" => 1}]
end

In reality, MapReduce works slightly differently; each chunk is represented as the value in a key-value pair, with the key being the identifier for that chunk (like 1..200 when using 200 even chunks, or the ID of a Google FileSystem cluster, or a filename when each server gets a different file). This key is useful for managing a MapReduce operation; if server XYZ is goes down, the master program knows that XYZ was handling chunk 19, and we can process chunk 19 again. So what we have is more like this:

1
2
3
word_chunks.each do |chunk_key, words|
  assign_to_server(count_words(chunk_key, words))
end

You might be asking yourself: if the map phase always creates values of “1” attached to a particular word, so a mapper might end up with [{"the" => 1}, {"the" => 1], {"the" => 1}], why even use a hash? Why not just create a big nested array of words ([["the","the","the"]]), group by word, and count the elements in each array?

Well, first, MapReduce isn’t always used for word counts, so in another use, the values returned by map might be more significant. Second, this lets us introduce a “combiner” stage between map and reduce to optimize the process. This stage creates a local count for a particular server, which reduces bandwidth and makes life easier for the reduce stage. Without this stage, a mapper might return [{"the" => 1}, {"cat" => 1}, {"the" => 1], {"dog" => 1}, {"the" => 1}], leaving it up to which ever reducer handles “the” to sum these numbers (along with other instances of “the”). But the combiner creates local sums, meaning the mapper will actually return [{"the" => 3}, {"cat" => 1}, {"dog" => 1}].

When all the distributed word counts finish, the results are grouped by key. So all the “cat” key/value pairs are grouped into one list, and the “dog” key/value pairs are grouped into another list. These are then reduced for a final word count. Our reduce pseudocode might look something like this.

1
2
3
4
5
grouped_results.each do |key, values|
  #key: "cat"
  #values: [1,3,12,9,1,2]
  total[key] = values.sum
end

These reductions can be distributed, so each pass through grouped_results can be handled by a different server, because the summing of instances of “war” is completely independent of the summing of instances of “peace”.

What’s next?

There is more to MapReduce than this; other pieces handle the fault tolerance, the grouping of keys between the map and reduce stages, etc. And the actual distribution of processing introduces a lot more complexity. If you actually want to use MapReduce, take a look at Hadoop, or a few Ruby distributed processing systems inspired by MapReduce (Skynet, Starfish).

Hopefully this post will give you a conceptual understanding of MapReduce; it’s an interesting and powerful architecture. Just remember that it is not the end-all of distributed processing, and just because it’s appropriate for Google doesn’t mean it is appropriate for you. In fact, MapReduce can only handle a certain array of problems; if you want to distribute video transcoding across multiple machines, for example, MapReduce can’t really help you. Keep in mind too that you shouldn’t reinvent the wheel – if Hadoop can help you, it’s already built and built well. But it never hurts to understand something new.

More Resources

Cluster Computing and MapReduce (video series)

MapReduce whitepaper

Hadoop

Using Ruby with Hadoop

Understanding map and reduce

Posted by Jon
on Monday, August 11

Map and reduce are two of the most important internal iterators in functional programming. But in my experience as a Ruby developer, while map is frequently used, it should be used a bit more; and reduce (== inject) is underused and often misunderstood.

So how do you know when to use map or reduce on a collection? Simple. When iterating through an array, if you don’t want a return value from the operations, use each; and if you’re looking for a return value, use the iterator method that delivers the type of value you want returned. So if you want to take a collection and return a subset of that collection based on some criteria, use select. (See an earlier article for more.) If you want to return a transformed version of each element, use map. And if you want to return any value whatsoever, or a value that doesn’t match another iterator method, use reduce.

As an aside, do reduce and map have anything to do with the MapReduce architecture for distributed processing? Not surprisingly, the answer is “yes,” and I’ll talk more about that later this week.

inject, reduce, fold

One function, three names. If you’re a Ruby user and have access to Ruby 1.8.7, I suggest you forget the name inject altogether; I find it confusing, personally, and moving forward, inject has another name: reduce. This is much better, and I’ll discuss terminology in a minute (along with a third common name for this function: fold).

reduce takes in an array and reduces it to a single value. It does this by iterating through a list, keeping and transforming a running total along the way. This running total can be a single value (0, 3.7, “abcdefg”), a collection ([], {}), or anything else, really. Each iteration starts with the return value of the previous iteration and does something with it.

Formally, reduce takes three arguments: a collection, an initial value (which is used on the first pass), and a function to apply at each pass through the collection. Here is a Ruby example that uses reduce to sum a series of numbers:

1
2
3
(5..10).reduce(0) do |sum, value|
  sum + value
end

Let’s walk through this example in more detail. Here are the three arguments passed to reduce in this example:

  • Collection: 5, 6, 7, 8, 9, 10
  • Initial Value: 0
  • Function: add current value to running total
Pass # Collection Value Running Total Return Value
1 5 0 (initializer value) 5
2 6 5 11
3 7 11 18
4 8 18 26
5 9 26 35
6 10 35 45

The return value from this function will be 45. At each pass, the function takes two values: the current element in the array, and the return value from the previous return value (or the initializer value for the first pass). (This is the |sum, value| part of the Ruby example.)

What would this example look like using each instead of reduce?

1
2
3
4
5
6

sum = 0
(5..10).each do |value|
  sum += value
end
sum

Any time you see this (anti)pattern – initializing a variable, looping to change the variable, and returning the variable – you know you need a new collection function. In this case, reduce does the trick.

Digression on blocks

Personally, while Ruby’s block syntax makes code beautifully readable, I sometimes have trouble keeping track of how this syntax relates to a straightforward functional syntax. After all, I described reduce as taking three arguments: a collection, a starter value, and a function. But in the Ruby example above, I’m only passing one argument (0) to reduce. So if it helps, here is a another way to think about reduce, in pseudo-scheme.


(reduce + 0 (range 5 10))

Here we’re explicitly passing three arguments to reduce: + (the addition operator), 0 (the seed value), and the range of numbers from 5 to 10 (our collection). Remember that (5..10).reduce(0) {|sum, value| sum + value } does exactly the same thing, just rearranged a bit.

Back on track

Let’s look at a slightly more complicated case. reduce can be used to implement just about any other collection function, from map to sort to select. Here is a way to emulate select using reduce.

1
2
3
4
5
(1..10).reduce([]) do |result, value| 
  result << value if value > 5
  result
end
# [6, 7, 8, 9, 10]

You can also emulate map with reduce, like this:

1
2
3
4
5
(1..10).reduce([]) do |result, value| 
  result << value * value
  result
end
# [1, 4, 9, 25, 36, 49, 64, 81, 100]

Of course, you wouldn’t want to do this. Whenever possible, you’re generally better off using a more specific function, like map in this case. If you want to sum numbers, use a sum function instead of reduce. If you want a hash, try build_hash. (I say “generally”, because there are also diminishing returns – creating a new reduce-style iterator for every possible use of reduce is overkill. Use your judgment.)

But this shows you the power of reduce; reduce can be used to implement any other internal iterator. Any time you want to take a collection return something else – a value, another collection, etc. – reduce is capable.

Why 3+ names?

This function has three names: “inject”, “reduce”, and “fold”. All make sense from one perspective.

  • fold is used by Haskell, Scheme, and OCaml. This name highlights the fact that this function “folds” the return value of one pass into the next pass. Actually, this function is really divided into fold-left and fold-right, referring to the direction of the reduction. Do you start at the left of the list, moving right, or go from right to left? For associative operations (like addition), it doesn’t make a difference. 1 + 2 + 3 == 3 + 2 + 1. But for non-associative operations, like division, exponents, and string concatenation, order matters: 12^ != 21^, and “a” + “b” != “b” + “a”.
  • reduce, used by Common Lisp, Python, Javascript, and now Ruby, describes the ultimate goal of the function: reduce a collection to a single return value. But keep in mind that the single return value can be a collection. So reduction has nothing to do with size – a reduce function called on a 10 element array could return a 100 element array, or it could return a single integer, or a hash, or something else.
  • inject, the Smalltalk name for this function (and the dominant Ruby name until recently), is my least favorite. I think it refers to “injecting” the return value of the previous function call into the next function call, but I could be wrong.
  • If it helps, you can even think of this function as accumulate, which is what C++ calls it. This name is generally appropriate; accumulate some return value through iterating through a collection. Just remember that there isn’t actually a “global” accumulated variable that is carried over through each function call, and returned at the end; each pass just folds its return value into the next pass. That’s it.

So that’s reduce. If you’re having trouble getting your mind around it, I recommend reading up a bit more, because it is an important concept. It is also important to understanding MapReduce.

map

map takes an array, applies a function to each element, and returns a new array with the results. Here is its equivalent using each.

1
2
3
4
5
email_addresses = []
users.each do |user|
  email_addresses << user.email
end
email_addresses

We can improve upon this using map.

1
2
3
users.map do |user|
  user.email
end

This is quite a bit simpler than reduce, and I’m not going to spend much time on it. If you’re an experienced Ruby programmer, you’ve probably used map hundreds of times. If it’s new to you, just remember that map takes an array and returns an array of exactly the same size. And think of some practical uses of map:

  • Convert [1,2,3,4,5] to [“one”, “two”, “three”, “four”, “five”]
  • Convert [“Jon Dahl”, “Luke Francl”, “Eric Chapweske”] to [“Jon”, “Luke”, “Eric”]
  • Convert [“72%”, “1%”, “50%”] to [.72, .01, .5]
  • Convert Tag.find(:all) to [”<span class=’small-tag’>Ruby</span>”,”<span class=’large-tag’>Merb</span>”,”<span class=’small-tag’>Perl</span>”]

Other functions

These aren’t the only important iterator functions, by any means. But map, reduce, and select are among the most important. Get them solidly under your belt, and you’ll write better code. They’ll also help you from a conceptual standpoint; MapReduce isn’t exactly map + reduce; it can even be implemented in languages that don’t have map or reduce capabilities. But it forms the conceptual foundation of MapReduce, and MapReduce works because of specific properties of map and reduce. More on that later this week.

Functional programming and looping

Posted by Jon
on Tuesday, July 29

If you’re a programmer, you’ve probably worked through one or more books teaching you the syntax of a new language. I’ve had this experience with half a dozen languages, like C, Javascript, and Perl. These books are typically introduce loops midway through the syntax discussion, after datatypes and control flow, but before I/O and advanced features.

Loops are almost always presented according to this formula.

  • Inane intro text: “what if you want to do an operation more than once”?
  • Introduce while loop, with difference between do while and while do.
  • Introduce for loop, the while loop’s crazy cousin.
  • (Bonus) Introduce foreach loop if language is sufficiently high-level. And that’s it – you know how to loop through code; time to move on.

Not so fast. If you’re lucky enough to use a language that draws from functional programming, you shouldn’t loop like this.

The point

From now on, I’m going to use Ruby for examples, but this article isn’t about Ruby. It is about transitioning from primitive loops to iterating through collections, and from generic collection functions (like each) to more specific functions (like map).

From loops to array traversal

For the last several months, I’ve been working on Tumblon, a medium-sized Rails application. I’ve worked on 15-20 Ruby applications over the last three years, probably totaling 50,000 lines of Ruby code.

I’ve only used a primitive loop once.

That primitive loop was a loop {} loop, forever polling a task list looking for jobs. In other words, a loop with no exit condition beyond ^C or a server crash. As far as I know, Ruby doesn’t have a for loop at all, which would explain why I haven’t used it. It has a foreach loop (for item in arr), but that’s syntactic sugar for arr.each {}.

So the first reason why I’ve only used a simple loop in one case: the each concept usually a better option. Its Ruby implementation will be familiar to anyone who’s seen Ruby code before:

1
2
3
["horse", "pig", "cow"].each do |animal|
  puts "Old MacDonald has a #{animal}"
end

(Yes, I have a small child.)

This is far cleaner than its for or while loop alternatives. And it is a better abstract representation of what we’re doing: we aren’t looping with an exit condition, we are iterating through an array. But what if you want to do something a fixed number of times? Even that can be understood as traversing a list, like [1,2,3,4,5,6,7,8,9,10].each {}. Of course, Ruby provides a cleaner version: 10.times {}.

So if your loop is working through a list of some sort, each is a better abstraction of the problem. And in my experience building Ruby applications, every loop but one has been traversing a list. Parsing XML? Traversing a collection. Summing numbers? Traversing a collection. Reading in a textfile? Listening to STDIN? Working with rows in a database? Traversing a collection. That’s what each loops do well.

Beyond arr.each

But each isn’t the final word. It is a step up from a primitive for or while loop when working with a collection of values, but many each loops should be replaced with other array methods, like map, inject, and select.

When is each useful? Simple: when you want to create side-effects, like saving to the database, printing a result, or sending a web service call. In these cases, you’re not concerned with the return value; you want to change state on the screen, the disk, the database, or something else. Take a look at this code.

1
2
3
User.find(:all).each do |user|
  Notification.deliver_email_newsletter(user)
end

You don’t need a return value from this – you need emails to be delivered.

But don’t use each if you want to extract some new value from an array. That’s not what it’s for. Instead, take a look at three other powerful functions: map, inject, or select. To see why, let’s take a look at select. Here is code that takes in an array, and creates a new array from elements that match a certain condition, using each.

1
2
3
4
5
active_users = []
users.each do |user|
  active_users << user if user.active?
end
active_users

Man, the first and last lines are ugly. Why do you have to initialize and return active_users? Answer: because this is a misuse of each. You are much better off using select (or its equivalent, find_all):

1
2
3
users.select do |user|
  user.active?
end

Using select is shorter, easier to understand, and less bug-prone. And more importantly, it clearly encapsulates one common use of each (and looping in general).

Two other key functions – map and inject (or reduce) – complement select and follow a similar pattern. And not surprisingly, they form the foundation of the mapreduce approach to distributed processing. I’ve written more about map and reduce in another article, and here is shorthand for knowing which of these functions to use:

Desired Return Value Function
New array with same number of values map
New array composed of part of the old array select
Single value (though this value can be an array) inject
none each

The point, redux

Use each for changing state. Otherwise, avoid side-effects and use “functional” array methods that return a value. Simple. Your code will be cleaner and less bug prone.

And remember the dead giveaway:

  1. Initialize an empty value, or array, or whatever (new_arr = [])
  2. arr.each, changing the initialized value
  3. Return the value (return new_arr)

Whenever you see this pattern, you know you’ve got an each loop that needs swapping out.

(Edit: I’ve posted a follow-up article with more about map and reduce.)

MapReduce at RailsConf Europe

Posted by Jon
on Thursday, July 03

This September, I’ll be presenting at RailsConf Europe on EC2, MapReduce, and Distributed Processing. The talk will explain the MapReduce approach to distributed processing, will show a few example implementations, and will discuss MapReduce vs. other distributed processing techniques.

Whether you’ll be there or not, if you’re interested in learning more about MapReduce, here are some resources. I’ll write a few more posts on the subject before the conference, so watch this space as well.

Cluster Computing and MapReduce is a great series of video lectures given to Google interns in 2007. The first two are the most appropriate: the first introduces distributed processing concept, while the second covers MapReduce itself.

MapReduce: Simplified Data Processing on Large Clusters is the paper by Jeffrey Dean and Sanjay Ghemawat of Google that got things going in the first place.

MapReduce for Ruby: Ridiculously Easy Distributed Programming discusses MapReduce and introduces Starfish, a Ruby library for distributed processing. Starfish is not a MapReduce implementation, however – it takes a somewhat different approach to distributed processing.

Skynet (a few writeups: InfoQ, Dion Almaer) is another Ruby-based distributed processing system inspired by MapReduce.

Writing Ruby Map-Reduce programs for Hadoop discusses using Ruby to wrap Hadoop, a MapReduce-like system built in Java.

Introduction to Parallel Programming and MapReduce at Google Code University, a good overview of distributed processing and the MapReduce approach.

And finally, one article that you should avoid:

MapReduce: A major step backwards compares MapReduce to relational databases, and says that MapReduces loses out because it doesn’t support database indices, database views, Crystal reports, etc. Basically, the complaint is that MapReduce isn’t SQL compliant. WTF? Clearly, the author(s) didn’t understand what MapReduce is. The problem, as explained elsewhere, is that the authors thought that MapReduce == CouchDB/SimpleDB. Which is obviously not true. %s/MapReduce/SimpleDB the original article and it makes some sense. But long story short, this article will teach you nothing about MapReduce, and will likely confuse you further. So stay away.