MapReduce

from Wikipedia, the free encyclopedia

MapReduce is a programming model introduced by Google Inc. for concurrent calculations over (several petabytes ) large amounts of data on computer clusters . MapReduce is also the name of an implementation of the programming model in the form of a software library.

With the MapReduce procedure, the data is processed in three phases (Map, Shuffle, Reduce), two of which are specified by the user (Map and Reduce). This allows calculations to be parallelized and distributed over several computers. In the case of very large amounts of data, parallelization may be necessary because the amounts of data are too large for a single process (and the executing computer system).

The programming model was inspired by the map and reduce functions, which are frequently used in functional programming , even if the way the library works differs from them. In 2010 a US patent was granted for MapReduce. The main contribution of MapReduce, however, is the underlying system, which strongly parallelizes the calculations, optimizes the reorganization of the data in the shuffle step, and can automatically react to errors in the cluster, such as the failure of entire nodes.

Working method

Illustration of the data flow

MapReduce2.svg

The above picture illustrates the data flow in the MapReduce calculation.

  • Map phase:
    • The input data (D, A, T, A) are distributed over a number of map processes (illustrated by colorful rectangles), which each calculate the map function provided by the user.
    • The map processes are ideally carried out in parallel.
    • Each of these map instances stores intermediate results (illustrated by pink stars).
    • From each map instance, data flows into possibly different intermediate result memories.
  • Shuffle phase:
    • The intermediate results are redistributed according to the output keys produced by the map function, so that all intermediate results with the same key are processed on the same computer system in the next step.
  • Reduce phase:
    • For each set of intermediate results, exactly one reduce process (illustrated by violet rectangles) calculates the reduce function provided by the user and thus the output data (illustrated by violet circles X, Y and Z ).
    • Ideally, the reduce processes are also carried out in parallel.

Definition of the MapReduce function

The MapReduce library implements a function which consists of a list of key - value - pairs (input list) a new list of key-value pairs (output list) is calculated:

Explanation:

  • The quantities and contain keys , the quantities and contain values .
  • All keys are of the same type , e.g. B. Strings.
  • All keys are of the same type, e.g. B. whole numbers.
  • All values are of the same type, e.g. B. Atoms.
  • All values are of the same type, e.g. B. Floating point numbers.
  • If and are sets, then the set of all pairs is meant, where and ( Cartesian product ).
  • If there is a set, then the set of all finite lists with elements is meant (based on the Kleene star ) - the list can also be empty.

Definition of the map and reduce functions

The user configures the library by providing the two functions Map and Reduce, which are defined as follows:

or.

Map phase

  • Map maps a pair, consisting of a key and a value , to a list of new pairs that play the role of intermediate results . The values are of the same type as the final results .
  • In the case of a new pair , the key assigned by Map refers to a list of intermediate results in which the value calculated by Map is collected.
  • The library calls the Map function for each pair in the input list.
  • All these map calculations are independent of each other, so that they can be carried out concurrently and distributed on a computer cluster.

Shuffle phase

  • Before the Reduce phase can start, the results of the Map phase must be grouped in lists according to their new key .
  • If map and reduce functions are carried out concurrently and distributed, a coordinated data exchange is necessary for this.
  • The performance of a map-reduce system depends largely on how efficiently the shuffle phase is implemented.
  • As a rule, the user will only influence the shuffle phase by designing the key . It is therefore sufficient to optimize it well once, and numerous applications can benefit from this.

Reduce phase

  • If all map calls have been made or if all intermediate results are available in , the library calls the Reduce function for each intermediate value list , which uses this to calculate a list of result values ​​that the library collects as pairs in the output list .
  • The calls to Reduce can also be distributed independently to different processes in the computer cluster.

Note: This representation was somewhat simplified, because the control of the MapReduce procedure will usually aim for a number of reduce processes, so that if there are intermediate results for more than different keys , intermediate results with different keys are stored in a common list. The corresponding pairs are sorted by key before the reduce calculation.

Combine phase

Optionally, a combine phase can take place before the shuffle phase. This usually has the same functionality as the reduce function, but is executed on the same node as the map phase. The aim is to reduce the amount of data that has to be processed in the shuffle phase and thus reduce the network load. The sense of the combine phase is immediately apparent when looking at the Wordcount example : Due to the different frequency of words in natural language, a German text, for example, would very often produce an output of the form ("and", 1) (same applies to articles and auxiliary verbs). The combine phase now turns 100 messages of the form ("and", 1) into one message of the form ("and", 100). This can significantly reduce the network load, but is not possible in all use cases.

Example: Distributed frequency analysis with MapReduce

problem

For extensive texts you want to find out how often which words occur.

Specification of the map and reduce functions

 map(String name, String document):
  // name: document name ("key")
  // document: document contents ("value")
  for each word w in document:
    EmitIntermediate(w, 1);

 reduce(String word, Iterator partialCounts):
  // word: a word ("key")
  // partialCounts: a list of aggregated partial counts ("values")
  //     for 'word'
  int result = 0;
  for each v in partialCounts:
    result += v;
  Emit(word, result);

Map phase

  • Map is given a document name and a document document as a character string.
  • Map traverses the document word for word.
  • Every time a word w is encountered, a 1 moves into the w -intermediate result list (if this does not yet exist, it is created).
  • If you are through with all the words and the text has a total of n different words, the mapping phase ends with n intermediate result lists, each collecting a different word that contains as many 1-entries as the corresponding word was found in the document.
  • It is possible that many map instances were running at the same time if several words and documents were passed to the library.

Shuffle phase

  • The intermediate result lists of several processes / systems for the same word w are combined and distributed to the systems for the reducers.

Reduce phase

  • Reduce is called for the word word and the partialCounts list of intermediate results .
  • Reduce runs through the list of intermediate results and adds up all the numbers found.
  • The sum result is returned to the library, it contains the number of times the word word was found in all documents.
  • The intermediate results could be calculated in parallel, by simultaneous Reduce calls.

All in all

  • A list of words and word frequencies is generated from a list of document names and documents.

Exemplary calculation

For example, the following calculation would be conceivable on a classic text :

 Text = "Fest gemauert in der Erden
         Steht die Form, aus Lehm gebrannt.
         Heute muß die Glocke werden,
         Frisch, Gesellen! seid zur Hand.
         Von der Stirne heiß
         Rinnen muß der Schweiß,
         Soll das Werk den Meister loben,
         Doch der Segen kommt von oben."

The text is divided into sentences; normalization is recommended by writing everything in lower case and removing the punctuation marks:

 Eingabeliste = [ (satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt"),
                  (satz_2, "heute muß die glocke werden frisch gesellen seid zur hand"),
                  (satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der segen kommt von oben") ]

The input list has three pairs as elements, so we can start three map processes:

 P1 = Map(satz_1, "fest gemauert in der erden steht die form aus lehm gebrannt")
 P2 = Map(satz_2, "heute muß die glocke werden frisch gesellen seid zur hand")
 P3 = Map(satz_3, "von der stirne heiß rinnen muß der schweiß soll das werk den meister loben doch der segen kommt von oben")

The map calls generate these intermediate result pairs:

 P1 = [ ("fest", 1), ("gemauert", 1), ("in", 1), ("der", 1), ("erden", 1),
        ("steht", 1), ("die", 1), ("form", 1), ("aus", 1), ("lehm, 1),
        ("gebrannt", 1) ]
 P2 = [ ("heute", 1), ("muß", 1), ("die", 1), ("glocke", 1), ("werden", 1),
        ("frisch", 1), ("gesellen", 1), ("seid", 1), ("zur", 1), ("hand", 1) ]
 P3 = [ ("von", 1), ("der", 1), ("stirne", 1), ("heiß", 1), ("rinnen", 1),
        ("muß, 1), ("der", 1), ("schweiß", 1), ("soll", 1), ("das", 1),
        ("werk", 1), ("den", 1), ("meister", 1), ("loben", 1), ("doch", 1),
        ("der", 1), ("segen", 1), ("kommt", 1), ("von", 1), ("oben", 1) ]

The map processes deliver their pairs to the MapReduce library, which collects them in the intermediate result lists. The following could happen in parallel (the same timing of the 3 map processes is unrealistic, the versions actually overlap. The T_word lists are available locally for each map process and are not synchronized between the steps):

 1. Iteration:
    P1:  T_fest     = [ 1 ]     (neu)
    P2:  T_heute    = [ 1 ]     (neu)
    P3:  T_von      = [ 1 ]     (neu)

 2. Iteration:
    P1:  T_gemauert = [ 1 ]     (neu)
    P2:  T_muß      = [ 1 ]     (neu)
    P3:  T_der      = [ 1 ]     (neu)

 3. Iteration:
    P1:  T_in       = [ 1 ]     (neu)
    P2:  T_die      = [ 1 ]     (neu)
    P3:  T_stirne   = [ 1 ]     (neu)

In the fourth step you can see that lists of intermediate results exist locally for each map process and cannot be reused globally:

 4. Iteration:
    P1:  T_der      = [ 1 ]     (neu, der 1. Map-Prozess hat noch kein T_der, nur P3)
    P2:  T_glocke   = [ 1 ]     (neu)
    P3:  T_heiss    = [ 1 ]     (neu)

 5. Iteration
    P1:  T_erden    = [ 1 ]     (neu)
    P2:  T_werden   = [ 1 ]     (neu)
    P3:  T_rinnen   = [ 1 ]     (neu)

 6. Iteration
    P1:  T_steht    = [ 1 ]     (neu)
    P2:  T_frisch   = [ 1 ]     (neu)
    P3:  T_muß      = [ 1 ]     (neu, der 3. Map-Prozess hat noch kein T_muß, nur P2)

In the seventh step, it happens for the first time that a further occurrence is collected in an interim result list that has already been created:

 7. Schritt
    P1:  T_die      = [ 1 ]     (neu, der 1. Map-Prozess hat noch kein T_die)
    P2:  T_gesellen = [ 1 ]     (neu)
    P3:  T_der      = [ 1, 1 ]  (beim 3. Map-Prozess seit Iteration 2 vorhandene Liste verwenden)

etc.

After 21 steps, all three map processes are finished with their work, the map phase ends and the reduce phase begins. The intermediate result lists that were created by different map processes for the same word are merged. For each of the resulting intermediate result lists (listed here sorted)

                     reduce
   T_der      = [ 1 ] ++ [ 1, 1, 1 ] -> [ 4 ]
   T_die      = [ 1 ] ++ [ 1 ]       -> [ 2 ]
   T_fest     = [ 1 ]                -> [ 1 ]
   T_gemauert = [ 1 ]                -> [ 1 ]
   T_glocke   = [ 1 ]                -> [ 1 ]
   T_heiss    = [ 1 ]                -> [ 1 ]
   T_heute    = [ 1 ]                -> [ 1 ]
   T_in       = [ 1 ]                -> [ 1 ]
   T_muß      = [ 1 ] ++ [ 1 ]       -> [ 2 ]
   T_stirne   = [ 1 ]                -> [ 1 ]
   T_von      = [ 1, 1 ]             -> [ 2 ]
   .
   .
   . (für alle verschiedenen T-Listen)

we can start a reduce process in parallel that enumerates the elements. The result of MapReduce looks something like this:

 Ausgabeliste = [ ("fest", 1), ("heute", 1), ("von", 2), ("gemauert", 1),
                  ("muß", 2), ("der", 4), ("in", 1), ("die", 2), .. ]

Further examples

Procedure Map function Reduce function
Distributed grep Returns the line found ( hit ) to a temporary memory Sufficient ( identical figure , more precisely: projection onto the 2nd component)
Sales evaluation Writes the article number and the amount for each receipt in a buffer Adds the amounts together for each different item number
Database system Reads, filters and processes subsets of data records Performs aggregate functions

generalization

After the process was ten years old in 2014, Google recently started offering an extension to Cloud Dataflow , which offers greater flexibility and is intended to promote cloud computing even more.

See also

Web links

Commons : MapReduce  - collection of images, videos and audio files

Technical article

software

PlasmaFS. Plasma MapReduce was developed by Gerd Stolpmann (Darmstadt).

  • Splunk.com data management and analysis engine for big data based on MapReduce
  • Stratosphere PACT programming model: Extension and generalization of the MapReduce programming model

Individual evidence

  1. Google spotlights data center inner workings . CNET News, Tech news blog
  2. Jeffrey Dean, Sanjay Ghemawat: MapReduce: Simplified Data Processing on Large Clusters . Google Labs : "Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages."
  3. ^ Ralf Lämmel ( Microsoft ): Google's MapReduce Programming Model - Revisited . (PDF)
  4. USP 7,650,331. United States Patent and Trademark Office
  5. ^ MapReduce paper