MapReduce
MapReduce is a programming model introduced by Google Inc. for concurrent calculations over (several petabytes ) large amounts of data on computer clusters . MapReduce is also the name of an implementation of the programming model in the form of a software library.
With the MapReduce procedure, the data is processed in three phases (Map, Shuffle, Reduce), two of which are specified by the user (Map and Reduce). This allows calculations to be parallelized and distributed over several computers. In the case of very large amounts of data, parallelization may be necessary because the amounts of data are too large for a single process (and the executing computer system).
The programming model was inspired by the map and reduce functions, which are frequently used in functional programming , even if the way the library works differs from them. In 2010 a US patent was granted for MapReduce. The main contribution of MapReduce, however, is the underlying system, which strongly parallelizes the calculations, optimizes the reorganization of the data in the shuffle step, and can automatically react to errors in the cluster, such as the failure of entire nodes.
Working method
Illustration of the data flow
The above picture illustrates the data flow in the MapReduce calculation.
- Map phase:
- The input data (D, A, T, A) are distributed over a number of map processes (illustrated by colorful rectangles), which each calculate the map function provided by the user.
- The map processes are ideally carried out in parallel.
- Each of these map instances stores intermediate results (illustrated by pink stars).
- From each map instance, data flows into possibly different intermediate result memories.
- Shuffle phase:
- The intermediate results are redistributed according to the output keys produced by the map function, so that all intermediate results with the same key are processed on the same computer system in the next step.
- Reduce phase:
- For each set of intermediate results, exactly one reduce process (illustrated by violet rectangles) calculates the reduce function provided by the user and thus the output data (illustrated by violet circles X, Y and Z ).
- Ideally, the reduce processes are also carried out in parallel.
Definition of the MapReduce function
The MapReduce library implements a function which consists of a list of key - value - pairs (input list) a new list of key-value pairs (output list) is calculated:
Explanation:
- The quantities and contain keys , the quantities and contain values .
- All keys are of the same type , e.g. B. Strings.
- All keys are of the same type, e.g. B. whole numbers.
- All values are of the same type, e.g. B. Atoms.
- All values are of the same type, e.g. B. Floating point numbers.
- If and are sets, then the set of all pairs is meant, where and ( Cartesian product ).
- If there is a set, then the set of all finite lists with elements is meant (based on the Kleene star ) - the list can also be empty.
Definition of the map and reduce functions
The user configures the library by providing the two functions Map and Reduce, which are defined as follows:
or.
Map phase
- Map maps a pair, consisting of a key and a value , to a list of new pairs that play the role of intermediate results . The values are of the same type as the final results .
- In the case of a new pair , the key assigned by Map refers to a list of intermediate results in which the value calculated by Map is collected.
- The library calls the Map function for each pair in the input list.
- All these map calculations are independent of each other, so that they can be carried out concurrently and distributed on a computer cluster.
Shuffle phase
- Before the Reduce phase can start, the results of the Map phase must be grouped in lists according to their new key .
- If map and reduce functions are carried out concurrently and distributed, a coordinated data exchange is necessary for this.
- The performance of a map-reduce system depends largely on how efficiently the shuffle phase is implemented.
- As a rule, the user will only influence the shuffle phase by designing the key . It is therefore sufficient to optimize it well once, and numerous applications can benefit from this.
Reduce phase
- If all map calls have been made or if all intermediate results are available in , the library calls the Reduce function for each intermediate value list , which uses this to calculate a list of result values that the library collects as pairs in the output list .
- The calls to Reduce can also be distributed independently to different processes in the computer cluster.
Note: This representation was somewhat simplified, because the control of the MapReduce procedure will usually aim for a number of reduce processes, so that if there are intermediate results for more than different keys , intermediate results with different keys are stored in a common list. The corresponding pairs are sorted by key before the reduce calculation.
Combine phase
Optionally, a combine phase can take place before the shuffle phase. This usually has the same functionality as the reduce function, but is executed on the same node as the map phase. The aim is to reduce the amount of data that has to be processed in the shuffle phase and thus reduce the network load. The sense of the combine phase is immediately apparent when looking at the Wordcount example : Due to the different frequency of words in natural language, a German text, for example, would very often produce an output of the form ("and", 1) (same applies to articles and auxiliary verbs). The combine phase now turns 100 messages of the form ("and", 1) into one message of the form ("and", 100). This can significantly reduce the network load, but is not possible in all use cases.
Example: Distributed frequency analysis with MapReduce
problem
For extensive texts you want to find out how often which words occur.
Specification of the map and reduce functions
map(String name, String document):
// name: document name ("key")
// document: document contents ("value")
for each word w in document:
EmitIntermediate(w, 1);
reduce(String word, Iterator partialCounts):
// word: a word ("key")
// partialCounts: a list of aggregated partial counts ("values")
// for 'word'
int result = 0;
for each v in partialCounts:
result += v;
Emit(word, result);
Map phase
- Map is given a document name and a document document as a character string.
- Map traverses the document word for word.
- Every time a word w is encountered, a 1 moves into the w -intermediate result list (if this does not yet exist, it is created).
- If you are through with all the words and the text has a total of n different words, the mapping phase ends with n intermediate result lists, each collecting a different word that contains as many 1-entries as the corresponding word was found in the document.
- It is possible that many map instances were running at the same time if several words and documents were passed to the library.
Shuffle phase
- The intermediate result lists of several processes / systems for the same word w are combined and distributed to the systems for the reducers.
Reduce phase
- Reduce is called for the word word and the partialCounts list of intermediate results .
- Reduce runs through the list of intermediate results and adds up all the numbers found.
- The sum result is returned to the library, it contains the number of times the word word was found in all documents.
- The intermediate results could be calculated in parallel, by simultaneous Reduce calls.
All in all
- A list of words and word frequencies is generated from a list of document names and documents.
Exemplary calculation
For example, the following calculation would be conceivable on a classic text :
Text = "Fest gemauert in der Erden
Steht die Form, aus Lehm gebrannt.
Heute muß die Glocke werden,
Frisch, Gesellen! seid zur Hand.
Von der Stirne heiß
Rinnen muß der Schweiß,
Soll das Werk den Meister loben,
Doch der Segen kommt von oben."
The text is divided into sentences; normalization is recommended by writing everything in lower case and removing the punctuation marks:
Eingabeliste = [ (satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt"),
(satz_2, "heute muß die glocke werden frisch gesellen seid zur hand"),
(satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der segen kommt von oben") ]
The input list has three pairs as elements, so we can start three map processes:
P1 = Map(satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt")
P2 = Map(satz_2, "heute muß die glocke werden frisch gesellen seid zur hand")
P3 = Map(satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der segen kommt von oben")
The map calls generate these intermediate result pairs:
P1 = [ ("fest", 1), ("gemauert", 1), ("in", 1), ("der", 1), ("erden", 1),
("steht", 1), ("die", 1), ("form", 1), ("aus", 1), ("lehm, 1),
("gebrannt", 1) ]
P2 = [ ("heute", 1), ("muß", 1), ("die", 1), ("glocke", 1), ("werden", 1),
("frisch", 1), ("gesellen", 1), ("seid", 1), ("zur", 1), ("hand", 1) ]
P3 = [ ("von", 1), ("der", 1), ("stirne", 1), ("heiß", 1), ("rinnen", 1),
("muß, 1), ("der", 1), ("schweiß", 1), ("soll", 1), ("das", 1),
("werk", 1), ("den", 1), ("meister", 1), ("loben", 1), ("doch", 1),
("der", 1), ("segen", 1), ("kommt", 1), ("von", 1), ("oben", 1) ]
The map processes deliver their pairs to the MapReduce library, which collects them in the intermediate result lists. The following could happen in parallel (the same timing of the 3 map processes is unrealistic, the versions actually overlap. The T_word lists are available locally for each map process and are not synchronized between the steps):
1. Iteration:
P1: T_fest = [ 1 ] (neu)
P2: T_heute = [ 1 ] (neu)
P3: T_von = [ 1 ] (neu)
2. Iteration:
P1: T_gemauert = [ 1 ] (neu)
P2: T_muß = [ 1 ] (neu)
P3: T_der = [ 1 ] (neu)
3. Iteration:
P1: T_in = [ 1 ] (neu)
P2: T_die = [ 1 ] (neu)
P3: T_stirne = [ 1 ] (neu)
In the fourth step you can see that lists of intermediate results exist locally for each map process and cannot be reused globally:
4. Iteration:
P1: T_der = [ 1 ] (neu, der 1. Map-Prozess hat noch kein T_der, nur P3)
P2: T_glocke = [ 1 ] (neu)
P3: T_heiss = [ 1 ] (neu)
5. Iteration
P1: T_erden = [ 1 ] (neu)
P2: T_werden = [ 1 ] (neu)
P3: T_rinnen = [ 1 ] (neu)
6. Iteration
P1: T_steht = [ 1 ] (neu)
P2: T_frisch = [ 1 ] (neu)
P3: T_muß = [ 1 ] (neu, der 3. Map-Prozess hat noch kein T_muß, nur P2)
In the seventh step, it happens for the first time that a further occurrence is collected in an interim result list that has already been created:
7. Schritt
P1: T_die = [ 1 ] (neu, der 1. Map-Prozess hat noch kein T_die)
P2: T_gesellen = [ 1 ] (neu)
P3: T_der = [ 1, 1 ] (beim 3. Map-Prozess seit Iteration 2 vorhandene Liste verwenden)
etc.
After 21 steps, all three map processes are finished with their work, the map phase ends and the reduce phase begins. The intermediate result lists that were created by different map processes for the same word are merged. For each of the resulting intermediate result lists (listed here sorted)
reduce
T_der = [ 1 ] ++ [ 1, 1, 1 ] -> [ 4 ]
T_die = [ 1 ] ++ [ 1 ] -> [ 2 ]
T_fest = [ 1 ] -> [ 1 ]
T_gemauert = [ 1 ] -> [ 1 ]
T_glocke = [ 1 ] -> [ 1 ]
T_heiss = [ 1 ] -> [ 1 ]
T_heute = [ 1 ] -> [ 1 ]
T_in = [ 1 ] -> [ 1 ]
T_muß = [ 1 ] ++ [ 1 ] -> [ 2 ]
T_stirne = [ 1 ] -> [ 1 ]
T_von = [ 1, 1 ] -> [ 2 ]
.
.
. (für alle verschiedenen T-Listen)
we can start a reduce process in parallel that enumerates the elements. The result of MapReduce looks something like this:
Ausgabeliste = [ ("fest", 1), ("heute", 1), ("von", 2), ("gemauert", 1),
("muß", 2), ("der", 4), ("in", 1), ("die", 2), .. ]
Further examples
Procedure | Map function | Reduce function |
---|---|---|
Distributed grep | Returns the line found ( hit ) to a temporary memory | Sufficient ( identical figure , more precisely: projection onto the 2nd component) |
Sales evaluation | Writes the article number and the amount for each receipt in a buffer | Adds the amounts together for each different item number |
Database system | Reads, filters and processes subsets of data records | Performs aggregate functions |
generalization
After the process was ten years old in 2014, Google recently started offering an extension to Cloud Dataflow , which offers greater flexibility and is intended to promote cloud computing even more.
See also
- Apache Hadoop - Java framework based on the MapReduce algorithm
Web links
Technical article
- Jeffrey Dean , Sanjay Ghemawat : MapReduce: Simplified Data Processing on Large Clusters , OSDI'04: Sixth Symposium on Operating System Design and Implementation (December 2004), Online
- Colby Ranger, Ramanan Raghuraman, Arun Penmetsa, Gary Bradski, Christos Kozyrakis: Evaluating Map Reduce for Multi-core and Multiprocessor Systems . (PDF; 353 kB) Stanford University
- Why MapReduce Matters to SQL Data Warehousing . Analysis of the introduction of MapReduce / SQL by Aster Data Systems and Greenplum
- Marc de Kruijf, Karthikeyan Sankaralingam: MapReduce for the Cell BE Architecture . (PDF; 528 kB) University of Wisconsin – Madison
- Hung-Chih Yang, Ali Dasdan, Ruey-Lung Hsiao, D. Stott Parker: Map-Reduce-Merge: Simplified Relational Data Processing on Large Clusters . In: Proc. of ACM SIGMOD , 2007, pp. 1029–1040 (This paper shows how to extend MapReduce to relational computing)
- FLuX: The fault-tolerant , load balancing eXchange operator from UC Berkeley offers an alternative to Google's MapReduce, with failover but additional implementation costs.
software
- Apache Hadoop MapReduce
- disco open source project (Python and Erlang) by the Nokia Research Center
- DryadLINQ - MapReduce implementation by Microsoft Research . Based on PLINQ and Dryad .
- MATLAB MapReduce is a Hadoop capable implementation of MathWorks in Matlab .
- Plasma MapReduce is an open source MapReduce implementation in Ocaml with its own distributed file system
- QtConcurrent Open Source C ++ MapReduce implementation (non-distributed) of Digia's Qt Development Frameworks
- Skynet Ruby Map / Reduce Library
PlasmaFS. Plasma MapReduce was developed by Gerd Stolpmann (Darmstadt).
- Splunk.com data management and analysis engine for big data based on MapReduce
- Stratosphere PACT programming model: Extension and generalization of the MapReduce programming model
Individual evidence
- ↑ Google spotlights data center inner workings . CNET News, Tech news blog
- ↑ Jeffrey Dean, Sanjay Ghemawat: MapReduce: Simplified Data Processing on Large Clusters . Google Labs : "Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages."
- ^ Ralf Lämmel ( Microsoft ): Google's MapReduce Programming Model - Revisited . (PDF)
- ↑ USP 7,650,331. United States Patent and Trademark Office
- ^ MapReduce paper