nullGoogle Cluster Computing Faculty Training Workshop Google Cluster Computing Faculty Training Workshop Module IV: MapReduce Theory, Implementation, and AlgorithmsThis presentation includes content © University of Washington and/or Google, Inc.
Redistributed under the Creative Commons Attribution 3.0 license.
All other contents:OverviewOverviewFunctional Programming Recap
MapReduce Theory & Implementation
MapReduce Algorithms
Functional Programming ReviewFunctional Programming ReviewFunctional operations do not modify data structures: They always create new ones
Original data still exists in unmodified form
Data flows are implicit in program design
Order of operations does not matterFunctional Programming ReviewFunctional Programming Reviewfun foo(l: int list) =
sum(l) + mul(l) + length(l)
Order of sum() and mul(), etc does not matter – they do not modify l
Functional Updates Do Not Modify StructuresFunctional Updates Do Not Modify Structuresfun append(x, lst) =
let lst' = reverse lst in
reverse ( x :: lst' )
The append() function above reverses a list, adds a new element to the front, and returns all of that, reversed, which appends an item.
But it never modifies lst!Functions Can Be Used As ArgumentsFunctions Can Be Used As Argumentsfun DoDouble(f, x) = f (f x)It does not matter what f does to its argument; DoDouble() will do it twice.
What is the type of this function?MapMapmap f lst: (’a->’b) -> (’a list) -> (’b list)
Creates a new list by applying f to each element of the input list; returns output in order.FoldFoldfold f x0 lst: ('a*'b->'b)->'b->('a list)->'b
Moves across a list, applying f to each element plus an accumulator. f returns the next accumulator value, which is combined with the next element of the listfold left vs. fold rightfold left vs. fold rightOrder of list elements can be significant
Fold left moves left-to-right across the list
Fold right moves from right-to-left
SML Implementation:
fun foldl f a [] = a
| foldl f a (x::xs) = foldl f (f(x, a)) xs
fun foldr f a [] = a
| foldr f a (x::xs) = f(x, (foldr f a xs))ExampleExamplefun foo(l: int list) =
sum(l) + mul(l) + length(l)
How can we implement this?
Example (Solved)Example (Solved)fun foo(l: int list) =
sum(l) + mul(l) + length(l)
fun sum(lst) = foldl (fn (x,a)=>x+a) 0 lst
fun mul(lst) = foldl (fn (x,a)=>x*a) 1 lst
fun length(lst) = foldl (fn (x,a)=>1+a) 0 lst
map Implementationmap ImplementationThis implementation moves left-to-right across the list, mapping elements one at a time
… But does it need to?fun map f [] = []
| map f (x::xs) = (f x) :: (map f xs)Implicit Parallelism In mapImplicit Parallelism In mapIn a purely functional setting, elements of a list being computed by map cannot see the effects of the computations on other elements
If order of application of f to elements in list is commutative, we can reorder or parallelize execution
This is the “secret” that MapReduce exploitsMapReduceMapReduceMotivation: Large Scale Data ProcessingMotivation: Large Scale Data ProcessingWant to process lots of data ( > 1 TB)
Want to parallelize across hundreds/thousands of CPUs
… Want to make this easy
MapReduceMapReduceAutomatic parallelization & distribution
Fault-tolerant
Provides status and monitoring tools
Clean abstraction for programmersProgramming ModelProgramming ModelBorrows from functional programming
Users implement interface of two functions:
map (in_key, in_value) ->
(out_key, intermediate_value) list
reduce (out_key, intermediate_value list) ->
out_value list
mapmapRecords from the data source (lines out of files, rows of a database, etc) are fed into the map function as key*value pairs: e.g., (filename, line).
map() produces one or more intermediate values along with an output key from the input.reducereduceAfter the map phase is over, all the intermediate values for a given output key are combined together into a list
reduce() combines those intermediate values into one or more final values for that same output key
(in practice, usually only one final value per key)
nullreducereducereduce (out_key, intermediate_value list) ->
out_value listParallelismParallelismmap() functions run in parallel, creating different intermediate values from different input data sets
reduce() functions also run in parallel, each working on a different output key
All values are processed independently
Bottleneck: reduce phase can’t start until map phase is completely finished.Example: Count word occurrencesExample: Count word occurrencesmap(String input_key, String input_value):
// input_key: document name
// input_value: document contents
for each word w in input_value:
EmitIntermediate(w, "1");
reduce(String output_key, Iterator intermediate_values):
// output_key: a word
// output_values: a list of counts
int result = 0;
for each v in intermediate_values:
result += ParseInt(v);
Emit(AsString(result)); Example vs. Actual Source CodeExample vs. Actual Source CodeExample is written in pseudo-code
Actual implementation is in C++, using a MapReduce library
Bindings for Python and Java exist via interfaces
True code is somewhat more involved (defines how the input key/values are divided up and accessed, etc.)LocalityLocalityMaster program divvies up tasks based on location of data: tries to have map() tasks on same machine as physical file data, or at least same rack
map() task inputs are divided into 64 MB blocks: same size as Google File System chunks
Fault ToleranceFault ToleranceMaster detects worker failures
Re-executes completed & in-progress map() tasks
Re-executes in-progress reduce() tasks
Master notices particular input key/values cause crashes in map(), and skips those values on re-execution.
Effect: Can work around bugs in third-party libraries!OptimizationsOptimizationsNo reduce can start until map is complete:
A single slow disk controller can rate-limit the whole process
Master redundantly executes “slow-moving” map tasks; uses results of first copy to finishWhy is it safe to redundantly execute map tasks? Wouldn’t this mess up the total computation?OptimizationsOptimizations“Combiner” functions can run on same machine as a mapper
Causes a mini-reduce phase to occur before the real reduce phase, to save bandwidthUnder what conditions is it sound to use a combiner?The Example AgainThe Example Againmap(String input_key, String input_value):
// input_key: document name
// input_value: document contents
for each word w in input_value:
EmitIntermediate(w, "1");
reduce(String output_key, Iterator intermediate_values):
// output_key: a word
// output_values: a list of counts
int result = 0;
for each v in intermediate_values:
result += ParseInt(v);
Emit(AsString(result)); MapReduce ConclusionsMapReduce ConclusionsMapReduce has proven to be a useful abstraction
Greatly simplifies large-scale computations at Google
Functional programming paradigm can be applied to large-scale applications
Fun to use: focus on problem, let library deal w/ messy details Part 2: AlgorithmsPart 2: AlgorithmsAlgorithms for MapReduceAlgorithms for MapReduceSorting
Searching
Indexing
Classification
TF-IDF
Breadth-First Search / SSSP
PageRank
ClusteringMapReduce JobsMapReduce JobsTend to be very short, code-wise
IdentityReducer is very common
“Utility” jobs can be composed
Represent a data flow, more so than a procedureSort: InputsSort: InputsA set of files, one value per line.
Mapper key is file name, line number
Mapper value is the contents of the lineSort AlgorithmSort AlgorithmTakes advantage of reducer properties: (key, value) pairs are processed in order by key; reducers are themselves ordered
Mapper: Identity function for value
(k, v) (v, _)
Reducer: Identity function (k’, _) -> (k’, “”)Sort: The TrickSort: The Trick(key, value) pairs from mappers are sent to a particular reducer based on hash(key)
Must pick the hash function for your data such that k1 < k2 => hash(k1) < hash(k2)
Final Thoughts on SortFinal Thoughts on SortUsed as a test of Hadoop’s raw speed
Essentially “IO drag race”
Highlights utility of GFSSearch: InputsSearch: InputsA set of files containing lines of text
A search pattern to find
Mapper key is file name, line number
Mapper value is the contents of the line
Search pattern sent as special parameterSearch AlgorithmSearch AlgorithmMapper:
Given (filename, some text) and “pattern”, if “text” matches “pattern” output (filename, _)
Reducer:
Identity functionSearch: An OptimizationSearch: An OptimizationOnce a file is found to be interesting, we only need to mark it that way once
Use Combiner function to fold redundant (filename, _) pairs into a single one
Reduces network I/OIndexing: InputsIndexing: InputsA set of files containing lines of text
Mapper key is file name, line number
Mapper value is the contents of the line
Inverted Index AlgorithmInverted Index AlgorithmMapper: For each word in (file, words), map to (word, file)
Reducer: Identity functionInverted Index: Data flowInverted Index: Data flowAn Aside: Word CountAn Aside: Word CountWord count was described in module I
Mapper for Word Count is (word, 1) for each word in input line
Strikingly similar to inverted index
Common theme: reuse/modify existing mappers
Bayesian ClassificationBayesian ClassificationFiles containing classification instances are sent to mappers
Map (filename, instance) (instance, class)
Identity ReducerBayesian ClassificationBayesian ClassificationExisting toolsets exist to perform Bayes classification on instance
E.g., WEKA, already in Java!
Another example of discarding input keyTF-IDFTF-IDFTerm Frequency – Inverse Document Frequency
Relevant to text processing
Common web analysis algorithm
The Algorithm, FormallyThe Algorithm, Formally
| D | : total number of documents in the corpus
: number of documents where the term ti appears (that is ).
Information We NeedInformation We NeedNumber of times term X appears in a given document
Number of terms in each document
Number of documents X appears in
Total number of documents Job 1: Word Frequency in DocJob 1: Word Frequency in DocMapper
Input: (docname, contents)
Output: ((word, docname), 1)
Reducer
Sums counts for word in document
Outputs ((word, docname), n)
Combiner is same as ReducerJob 2: Word Counts For DocsJob 2: Word Counts For DocsMapper
Input: ((word, docname), n)
Output: (docname, (word, n))
Reducer
Sums frequency of individual n’s in same doc
Feeds original data through
Outputs ((word, docname), (n, N))
Job 3: Word Frequency In CorpusJob 3: Word Frequency In CorpusMapper
Input: ((word, docname), (n, N))
Output: (word, (docname, n, N, 1))
Reducer
Sums counts for word in corpus
Outputs ((word, docname), (n, N, m))Job 4: Calculate TF-IDFJob 4: Calculate TF-IDFMapper
Input: ((word, docname), (n, N, m))
Assume D is known (or, easy MR to find it)
Output ((word, docname), TF*IDF)
Reducer
Just the identity function
Final Thoughts on TF-IDFFinal Thoughts on TF-IDFSeveral small jobs add up to full algorithm
Lots of code reuse possible
Stock classes exist for aggregation, identity
Jobs 3 and 4 can really be done at once in same reducer, saving a write/read cycleBFS: Motivating ConceptsBFS: Motivating ConceptsPerforming computation on a graph data structure requires processing at each node
Each node contains node-specific data as well as links (edges) to other nodes
Computation must traverse the graph and perform the computation step
How do we traverse a graph in MapReduce? How do we represent the graph for this?Breadth-First SearchBreadth-First SearchBreadth-First Search is an iterated algorithm over graphs
Frontier advances from origin by one level with each passBreadth-First Search & MapReduceBreadth-First Search & MapReduceProblem: This doesn't “fit” into MapReduce
Solution: Iterated passes through MapReduce – map some nodes, result includes additional nodes which are fed into successive MapReduce passesBreadth-First Search & MapReduce Breadth-First Search & MapReduce Problem: Sending the entire graph to a map task (or hundreds/thousands of map tasks) involves an enormous amount of memory
Solution: Carefully consider how we represent graphsGraph RepresentationsGraph RepresentationsThe most straightforward representation of graphs uses references from each node to its neighborsDirect ReferencesDirect ReferencesStructure is inherent to object
Iteration requires linked list “threaded through” graph
Requires common view of shared memory (synchronization!)
Not easily serializableclass GraphNode
{
Object data;
Vector
out_edges;
GraphNode
iter_next;
}
Adjacency MatricesAdjacency MatricesAnother classic graph representation. M[i][j]= '1' implies a link from node i to j.
Naturally encapsulates iteration over nodesAdjacency Matrices: Sparse RepresentationAdjacency Matrices: Sparse RepresentationAdjacency matrix for most large graphs (e.g., the web) will be overwhelmingly full of zeros.
Each row of the graph is absurdly long
Sparse matrices only include non-zero elementsSparse Matrix RepresentationSparse Matrix Representation
1: (3, 1), (18, 1), (200, 1)
2: (6, 1), (12, 1), (80, 1), (400, 1)
3: (1, 1), (14, 1)
…Sparse Matrix RepresentationSparse Matrix Representation
1: 3, 18, 200
2: 6, 12, 80, 400
3: 1, 14
…Finding the Shortest PathFinding the Shortest PathA common graph search application is finding the shortest path from a start node to one or more target nodes
Commonly done on a single machine with Dijkstra's Algorithm
Can we use BFS to find the shortest path via MapReduce?This is called the single-source shortest path problem. (a.k.a. SSSP)Finding the Shortest Path: IntuitionFinding the Shortest Path: IntuitionWe can define the solution to this problem inductively:
DistanceTo(startNode) = 0
For all nodes n directly reachable from startNode, DistanceTo(n) = 1
For all nodes n reachable from some other set of nodes S,
DistanceTo(n) = 1 + min(DistanceTo(m), m S)From Intuition to AlgorithmFrom Intuition to AlgorithmA map task receives a node n as a key, and (D, points-to) as its value
D is the distance to the node from the start
points-to is a list of nodes reachable from n
p points-to, emit (p, D+1)
Reduce task gathers possible distances to a given p and selects the minimum oneWhat This Gives UsWhat This Gives UsThis MapReduce task can advance the known frontier by one hop
To perform the whole BFS, a non-MapReduce component then feeds the output of this step back into the MapReduce task for another iteration
Problem: Where'd the points-to list go?
Solution: Mapper emits (n, points-to) as wellBlow-up and TerminationBlow-up and TerminationThis algorithm starts from one node
Subsequent iterations include many more nodes of the graph as frontier advances
Does this ever terminate?
Yes! Eventually, routes between nodes will stop being discovered and no better distances will be found. When distance is the same, we stop
Mapper should emit (n, D) to ensure that “current distance” is carried into the reducerAdding weightsAdding weightsWeighted-edge shortest path is more useful than cost==1 approach
Simple change: points-to list in map task includes a weight 'w' for each pointed-to node
emit (p, D+wp) instead of (p, D+1) for each node p
Works for positive-weighted graphComparison to DijkstraComparison to DijkstraDijkstra's algorithm is more efficient because at any step it only pursues edges from the minimum-cost path inside the frontier
MapReduce version explores all paths in parallel; not as efficient overall, but the architecture is more scalable
Equivalent to Dijkstra for weight=1 casePageRank: Random Walks Over The WebPageRank: Random Walks Over The WebIf a user starts at a random web page and surfs by clicking links and randomly entering new URLs, what is the probability that s/he will arrive at a given page?
The PageRank of a page captures this notion
More “popular” or “worthwhile” pages get a higher rankPageRank: VisuallyPageRank: VisuallyPageRank: FormulaPageRank: FormulaGiven page A, and pages T1 through Tn linking to A, PageRank is defined as:
PR(A) = (1-d) + d (PR(T1)/C(T1) + ... +
PR(Tn)/C(Tn))
C(P) is the cardinality (out-degree) of page P
d is the damping (“random URL”) factor PageRank: IntuitionPageRank: IntuitionCalculation is iterative: PRi+1 is based on PRi
Each page distributes its PRi to all pages it links to. Linkees add up their awarded rank fragments to find their PRi+1
d is a tunable parameter (usually = 0.85) encapsulating the “random jump factor”PR(A) = (1-d) + d (PR(T1)/C(T1) + ... + PR(Tn)/C(Tn))PageRank: First ImplementationPageRank: First ImplementationCreate two tables 'current' and 'next' holding the PageRank for each page. Seed 'current' with initial PR values
Iterate over all pages in the graph, distributing PR from 'current' into 'next' of linkees
current := next; next := fresh_table();
Go back to iteration step or end if convergedDistribution of the AlgorithmDistribution of the AlgorithmKey insights allowing parallelization:
The 'next' table depends on 'current', but not on any other rows of 'next'
Individual rows of the adjacency matrix can be processed in parallel
Sparse matrix rows are relatively small
Distribution of the AlgorithmDistribution of the AlgorithmConsequences of insights:
We can map each row of 'current' to a list of PageRank “fragments” to assign to linkees
These fragments can be reduced into a single PageRank value for a page by summing
Graph representation can be even more compact; since each element is simply 0 or 1, only transmit column numbers where it's 1nullPhase 1: Parse HTMLPhase 1: Parse HTMLMap task takes (URL, page content) pairs and maps them to (URL, (PRinit, list-of-urls))
PRinit is the “seed” PageRank for URL
list-of-urls contains all pages pointed to by URL
Reduce task is just the identity functionPhase 2: PageRank DistributionPhase 2: PageRank DistributionMap task takes (URL, (cur_rank, url_list))
For each u in url_list, emit (u, cur_rank/|url_list|)
Emit (URL, url_list) to carry the points-to list along through iterations
PR(A) = (1-d) + d (PR(T1)/C(T1) + ... + PR(Tn)/C(Tn))Phase 2: PageRank DistributionPhase 2: PageRank DistributionReduce task gets (URL, url_list) and many (URL, val) values
Sum vals and fix up with d
Emit (URL, (new_rank, url_list))PR(A) = (1-d) + d (PR(T1)/C(T1) + ... + PR(Tn)/C(Tn))Finishing up...Finishing up...A non-parallelizable component determines whether convergence has been achieved (Fixed number of iterations? Comparison of key values?)
If so, write out the PageRank lists - done!
Otherwise, feed output of Phase 2 into another Phase 2 iterationPageRank ConclusionsPageRank ConclusionsMapReduce isn't the greatest at iterated computation, but still helps run the “heavy lifting”
Key element in parallelization is independent PageRank computations in a given step
Parallelization requires thinking about minimum data partitions to transmit (e.g., compact representations of graph rows)
Even the implementation shown today doesn't actually scale to the whole Internet; but it works for intermediate-sized graphsClusteringClusteringWhat is clustering?Google NewsGoogle NewsThey didn’t pick all 3,400,217 related articles by hand…
Or Amazon.com
Or Netflix…Other less glamorous things...Other less glamorous things...Hospital Records
Scientific Imaging
Related genes, related stars, related sequences
Market Research
Segmenting markets, product positioning
Social Network Analysis
Data mining
Image segmentation…The Distance MeasureThe Distance MeasureHow the similarity of two elements in a set is determined, e.g.
Euclidean Distance
Manhattan Distance
Inner Product Space
Maximum Norm
Or any metric you define over the space…Types of AlgorithmsHierarchical Clustering vs.
Partitional ClusteringTypes of AlgorithmsHierarchical ClusteringHierarchical ClusteringBuilds or breaks up a hierarchy of clusters.Partitional ClusteringPa
本文档为【Module 4 - MapReduce Theory and Algorithms】,请使用软件OFFICE或WPS软件打开。作品中的文字与图均可以修改和编辑,
图片更改请在作品中右键图片并更换,文字修改请直接点击文字进行修改,也可以新增和删除文档中的内容。
该文档来自用户分享,如有侵权行为请发邮件ishare@vip.sina.com联系网站客服,我们会及时删除。
[版权声明] 本站所有资料为用户分享产生,若发现您的权利被侵害,请联系客服邮件isharekefu@iask.cn,我们尽快处理。
本作品所展示的图片、画像、字体、音乐的版权可能需版权方额外授权,请谨慎使用。
网站提供的党政主题相关内容(国旗、国徽、党徽..)目的在于配合国家政策宣传,仅限个人学习分享使用,禁止用于任何广告和商用目的。