Sunday, September 30, 2018

Simulating MapReduce in Mathematica

MapReduce is one of the most important algorithm types in big data. Perhaps its biggest draw is the fact that it can be massively distributed across commodity hardware to crunch massive amounts of data. This puts super computing in the hands of those without specialized parallel hardware. In a very basic example for a Hadoop class I did, I implemented a basic MapReduce simulator in Mathematica in order to illustrate how MapReduce works. I also got some extra credit for it, as well.

In a nutshell, the MapReduce system has at least two stages: mapping and reducing. In the initial mapping stage, a function is applied to every piece of data fed into the system. This could be anything, but the important thing is that this can be parallelized across multiple machines so that each worker has a chunk of data to be working on. The results from this get reduced into some kind of summary. A good example of this would be a histogram or an average or something of that nature. What's important is that the underlying operation is commutative (that is to say that the order doesn't matter).

The MapReduce paradigm can be used in any language that supports reading and writing to standard file descriptors. Some languages can abstract this away, especially ones that treat functions as first-class data types. If you can pass a function as a mapper and another as a reducer, you can let that abstraction take care of mapping your map function over the data coming from some input and writing it to some output (such as being piped to a reducer.)

The Wolfram Language is a symbolic language, so it almost has to treat functions as first-class data types. There is a HadoopLink package for Mathematica that handles the input/output tasks as well as job submission. It also has some functions for interfacing with HDFS. But for right now, we're in the business of simulation.

Let's create a sample problem. Let's say I'm given a bunch of year-temperature pairs and I want to find the maximum temperature in every year. I created a dummy dataset by running
data = Table[{RandomInteger[{1950, 2010}], RandomReal[{60, 90}]}, {i, 1, 100}]
which yields a list of year temperature pairs. According to the documentation, a mapping function must be a pure function that takes two arguments: a key and a value. In our simple case, we just want to output the key and value as the key and value so it can get reduced (find the max number).
Mapper = Function[{k, v}, Yield[k, v]];
The reducer would take the list of values for a given key and simply return the maximum. In Mathematica, we can just pass the whole value to the Max function.
Reducer = Function[{k,vs}, Yield[k,Max[vs]]];
After that, you’d put the files on HDFS and run the job using HadoopMapReduceJob. The actual package walkthrough is at http://blog.wolfram.com/2013/07/31/mathematica-gets-bigdata-with-hadooplink/. That package handles all of the streaming and the data type conversions. But we're interested in just the simulation here as a proof of concept that it actually does what it's supposed to. The mapper and reducer work in the same way as they would if you were to submit them using HadoopMapReduceJob. Hadoop itself would handle all of the grouping by key for the reduce phase and only pass back the results that were needed. The Mathematica code would actually run on using a MathLink connection (which is a bridge between Java and Mathematica) if this were a real Hadoop job. Then the key value pairs would then be written to HDFS. After that, you could import the results back into Mathematica.

Here's the code and what it's supposed to look like on the output. Theoretically, it should generalize fairly well.
data = Table[{RandomInteger[{1950, 2010}], RandomReal[{60, 90}]}, {i, 1, 100}];
Mapper = Function[{k, v}, Yield[k, v]];
Reducer = Function[{k, vs}, Yield[k, Max[vs]]];

(*MapReduce Simulator*)
Yield[x_, y_] := {x, y};

Print["Mapping over data"];
mappedData = Map[Apply[Mapper, #] &, data];
Print["Map stage produced ", Length[mappedData], " pairs"];

(*Creates Correct Association*)
groupedData = Map[Last, GroupBy[mappedData, First], {2}];
(*Converts Association to a List of Lists which the reducer expects*)
groupedData = Partition[Riffle[Keys@groupedData, Values@groupedData], 2];

Print["Sending ", Length[groupedData], " pairs to reducer"];
reducedData = Map[Apply[Reducer, #] &, groupedData];

Print["Got ", Length[reducedData], " pairs back from reducer."];
Sort[reducedData] // Grid


No comments:

Post a Comment