Home > Uncategorized > Hadoop Notes

Hadoop Notes

Hadoop – distributed storage/compute system

2 components
HDFS – storage
MapReduce – compute

data is split into 64MB or 128MB

Traditional RDBMS, schema on write; can’t handle unstructured data; reads fast; structured creates governance

Hadoop – Schema on read; any kind of data file

Flexibility, Scalability, Economics (keeping more data online)

Volume (amount of data), Velocity (growth of data), variety (different sources of data)

Open Time Series Database

Multiples of:
1 HD + 2 cores + 6-8GB RAM tend to work well

Slave Nodes:
Hyperthreading should be enabled
Disk/network i/o are typical bottlenecks
Slave node config specifies max # of Map and Reduce tasks that can run simultaneously on that node
Each Map or Reduce task will take 1GB to 2GB of RAM
Should not use virtual memory
Ensure enough RAM to run all tasks, plus overhead for DataNode and TaskTracker daemons, plus OS
Rule of thumb for memory: total # of tasks = 1.5 x # logical processor cores
More spindles is better
4-12 disks per node
Use 3.5″ disks (faster, cheaper, higher capacity than 2.5″)
7200 RPM SATA drives are fine
8 x 1.5TB is better than 6 x 2TB
Practical max is 24TB per slave node (> will result in massive network traffic if node dies, block re-replication take place
No benefit from RAID
HDFS provides built-in redundancy
RAID striping (RAID0) slower than the JBOD config by HDFS; limited by slowest disk in RAID array
Disk ops on JBOD is independent; avg speed is > than the slowest disk
Virtualization is not worth considering
Blade servers not recommended

Master Nodes:
Single point of failure
If NameNode goes down, cluster is inaccessible
If JobTracker goes down, no jobs can run on cluster. Currently running jobs will fail.
Carrier-class h/w (dual power supplies, dual ethernet cards, RAIDed HDs, RAM >32GB)

Network considerations:
Hadoop is bw intensive; data and NameNodes send heartbeats every 3s
Use dedicated switches for cluster
Nodes connected at top of rack switch, min 1Gb/s
If large amts of intermediate data is generated, consider 10Gb/s (alternative is bond 2x 1Gb/s interfaces)

Node configuraton:
CentOS, RHEL; RHEL on master nodes, CentOS on slaves
Do not use Linux LVM
Check BIOS settings (e.g. SATA drives should not have IDE emulation enabled)
Test disk i/o speed with “hdparm -t /dev/sda1”; Should see >70MB/s or indicates problems
Hadoop has no specific disk partitioning reqts
Mount disks with noatime option
Common directory structure for data mount points (4 disk example):
– /data/1/dfs/nn
– /data/2/dfs/dn
– /data/3/dfs/snn
– /data/4/mapred/loc
Set vm.swappiness to 0 or 5 in sysctl.conf
Cloudera recommends ext3/ext4
XFS provides some performance benefit during kickstart (formats in 0s)
XFS has some perf issues (slow deletes, problems when machine runs out of memory in some versions)
Increase nofile ulimit for mapred, hdfs users to 32k in limits.conf
Disable IPv6
Disable SELinux
Install NTP
Java JDk 1.6.0u24 or 1.6.0u26
http://hadoop.apache.org (Vanilla base)
Or Cloudera’s Distribution including Apache Hadoop (CDH) starts with the latest stable Hadoop distribution
– interoperability, patches, bug fixes, more tools
– CDH does integration testing, hadoop version may lag a bit to Apache as it more testing

Hadoop concepts
– Open source project by Apache Software Foundation
– Originally bes on papers published by Google in 2003, 2004
– 2 core components (HDFS, MapReduce)
– Other projects based around core Hadoop (Pig, Hive, HBase, Flume, Oozie, Sqoop, etc.)
– Hadoop cluster (set of machines running HDFS, MapReduce)
– individual machines are known as nodes
– cluster can have 1 or more nodes

HDFS (written in Java)
– storing data
– data is split into blocks and distributed across multiple nodes in the cluster (block is typically 64MB or 128MB)
– each block is replicated many times (default = 3x, replicas are stored on different nodes)
– provides durability, fault tolerance, data throughput
– sits on top of native fs such as ext3/4, or xfs
– provides redundant storage for massive amounts of data
– performs best with a ‘modest’ number of files (millions rather than billions; each file is 100MB or more)
– files in HDFS are ‘write once’; no random writes to files are allowed
– optimized for large, streaming reads of files rather than random reads
– blocks are stored as standard files on the DataNodes in a set of directories specified in Hadoop’s config files

– system used to process data in the Hadoop cluster
– 2 phases (Map, then Reduce); between the 2 phases is stage known as “shuffle and sort”
– each Map task operates on a discrete portion of the overall dataset, typically 1 HDFS block of data
– after all Maps are complete, the MapReduce system distributes the intermediate data to nodes which perform the reduce phase

How files are stored
– Each block is 64 or 128MB
– Data is distributed across many machines at load time
– Blocks are replicated across multiple machines, known as DataNodes
– Master node is called the NameNode which keeps track of which blocks make up a file and where these blocks are located (known as metadata)

HDFS NameNode
– NameNode daemon must be running at all times or else cluster will be inaccessible
– NameNode holds all of its metadata in RAM for fast access; keeps record of changes on disk for crash recovery
– separate Secondary NameNode takes care of housekeeping tasks for NameNode, but is NOT a backup to NameNode!

NameNode High Availability in CDH4
– CDH4 introduced HA for the NameNode
– Instead of single NameNode, there are now 2 (active, standby)
– If Active fails, Standby NameNode can automatically take over
– Standby NameNode does the work performed by the Secondary NameNode in ‘classic’ HDFS
– HA HDFS does not run a Secondary NameNode daemon
– your sysadmin will choose whether to set up cluster with NameNode HA or not

When a client app reads a file:
– it communicates with NameNode to determine which blocks make up file and which DataNodes those blocks reside on
– it then commmunicates directly with the DataNodes to read the data
– NameNode will not be a bottleneck

Accessing HDFS
– applications can read/write HDFS files directly via Java API
– typically, files are created on local filesystem and must be moved into HDFS
– files stored in HDFS may need to be moved to a machine’s local filesystem
– access to HDFS from the cmd line is achieved through “hadoop fs” cmd

Hadoop fs Examples (copyFromLocal is synonym for put; copyToLocal is synonym for get)
# Copy file foo.txt from local disk to user’s dir in HDFS (copy file to /user/username/foo.txt)
hadoop fs -put foo.txt foo.txt
# Get a dir listing of user’s home dir in HDFS
hadoop fs -ls
# Get a dir listing of HDFS root dir
hadoop fs -ls /
# Display contents of HDFS file /user/fred/bar.txt
hadoop fs -cat /user/fred/bar.txt
# Move that file to local disk, named baz.txt
hadoop fs -get /user/fred/bar.txt baz.txt
# Create dir called “input” under the user’s home dir
hadoop fs -mkdir input
# Delete dir “input_old” and all of its contents
hadoop fs -rm -r input_old

How MapReduce works
– distributing a task across multiple nodes
– each node processes data stored on that node, where possible
– consists of 2 phases (Map, Reduce)

Features of MapReduce
– automatic parallelization and distribution
– fault-tolerance
– status and monitoring tools
– clean abstraction for programmers (programs usually wirtten in Java, but can use others through Hadoop Streaming)
– abstracts all of the ‘housekeeping’ away from the developer; developer can concentracte simply on writing the Map and Reduce functions

MapReduce: JobTracker
– jobs are controlled by JobTracker
– Jobtracker resides on ‘master node’
– clients submit MapReduce jobs to JobTracker
– JobTracker assigns Map and Reduce tasks to other nodes on the cluster
– these nodes run a software daemon known as TaskTracker
– TaskTracker is responsible for actually instantiating the Map or Reduce task, and reporting progress back to JobTracker

Aside: MapReduce v2
– CDH4 contains ‘standard’ MapReduce (MR1); also includes MR2, also known as YARN (Yet Another Resource Negotiator)
– MR2 is not yet considered Prod ready
– Existing code will work with no mod on MR2 clusters when technology matures; code will need to recompiled, but API same
– for Prod, recommend using MR1

MapReduce: Terminology
– job, a complete execution of Mappers and Reducers over a dataset
– task, execution of a single Mapper or Reducer over a slice of data
– task attempt, particular instance of an attempt to execute a task
– there will be at least as many task attempts as there are tasks
– if a task attempt fails, another will be started by JobTracker
– Speculative execution can also result in more task attempts than completed tasks

MapReduce: The Mapper
– Hadoop attempts to ensure that Mappers run on nodes which hold their portion of the data locally, to avoid network traffic
– Multiple Mappers run in parallel, each processing a portion of the input data
– Mapper reads data in the form of key/value pairs
– outputs zero or more key/value pairs (pseudo-code): map(in_key, in_value) -> (inter_key, inter_value) list
– Mapper may use or completely ignore the input key
– std pattern is to read a line of a file at a time
– key is the byte offset into the file at which the line starts
– value is the contents of the line itself
– typically the key is considered irrelevant
– If Mapper writes anything out, output must be in the form of key/value pairs

Example Mapper: Upper Case Mapper
– Turn input into upper case (pseudo-code): let map(k, v) = emit(k.toUpper(), v.toUpper())
– (‘foo’, ‘bar’) -> (‘FOO’, ‘BAR’)

Example Mapper: Explode Mapper
– output each input character separately (psuedo-code):
let map(k, v) = foreach char c in v: emit (k, c)
– (‘foo’, ‘bar’) -> (‘foo’, ‘b’), (‘foo’, ‘a’), (‘foo’, ‘r’)
– (‘baz’, ‘other’) -> (‘baz’, ‘o’), (‘baz’, ‘t’), (‘baz’, ‘h’), (‘baz’, ‘e’), (‘baz’, ‘r’)

Example Mapper: Filter Mapper
– only output key/value pairs where the input value is a prime number (psuedo-code):
let map(k, v) = if (isPrime(v)) then emit (k, v)
– (‘foo’, 7) -> (‘foo’, 7)
– (‘foo’, 10) -> nothing
Example Mapper: Changing Keyspaces
– key output by the Mapper does not need to be identical to the input key
– output the word length as the key (pseudo-code):
let map(k, v) = emit(v.length(), v)
– (‘foo’, ‘bar’) -> (3, ‘bar’)
– (‘baz’, ‘other’) -> (5, ‘other’)

MapReduce: The Reducer
– After the Map phase is over, all the intermediate values for a given intermediate key are combined together into a list
– This list is given to a Reducer
– There may be a single Reducer or multiple Reducers
– This is specified as part of the job configuration (see later)
– All values associated with a particular intermediate key are guaranteed to go to the same Reducer
– The intermediate keys, and their value lists, are passed to the Reducer in sorted key order
– This step is known as the ‘shuffle and sort’
– The Reducer outputs zero or more final key/value pairs
– These are written to HDFS
– In practice, the Reducer usually emits a single key/value pair for each input key

Example Mapper: Sum Reducer
– Add up all the values associated with each intermediate key (pseudo-code):
let reduce(k, vals) =
sum = 0
foreach int i in vals:
sum += i
emit(k, sum)
– (‘bar’, [9, 3, -17, 44]) -> (‘bar’, 39)

Example Mapper: Identity Reducer
let reduce(k, vals) =
foreach v in vals:
emit(k, v)
– (‘bar’, [123, 100, 77]) -> (‘bar’, 123), (‘bar’, 100), (‘bar’, 77)

MapReduce Example: Word Count
– count the number of occurrences of each word in a large amount of input data
– This is the ‘hello world’ of MapReduce programming
map(String input_key, String input_value)
foreach word w in input_value:
emit(w, 1)
reduce(String output_key, Iterator<int> intermediate_vals)
set count = 0
foreach v in intermediate_vals:
count += v
emit(output_key, count)
– input to the Mapper (3414 and 3437 are byte offsets, don’t care about these values)
(3414, ‘the cat sat on the mat’)
(3437, ‘the aardvark sat on the sofa’)
– output from the Mapper:
(‘the’, 1), (‘cat’, 1), (‘sat’, 1), (‘on’, 1), (‘the’, 1), (‘mat’, 1), (‘the’, 1), (‘aardvark’, 1), (‘sat’, 1), (‘on’, 1), (‘the’, 1), (‘sofa’, 1)
– intermediate data sent to the Reducer:
(‘aardvark’, [1])
(‘cat’, [1])
(‘mat’, [1])
(‘on’, [1, 1])
(‘sat’, [1, 1])
(‘sofa’, [1])
(‘the’, [1, 1, 1, 1])
– final Reducer output
(‘aardvark’, 1)
(‘cat’, 1)
(‘mat’, 1)
(‘on’, 2)
(‘sat’, 2)
(‘cat’, 1)
(‘sofa’, 1)
(‘the’, 4)

MapReduce: Data Locality
– Whenever possible, Hadoop will attempt to ensure that a Map task on a node is working on a block of data stored locally on that node via HDFS
– If this is not possible, the Map task will have to transfer the data across the network as it processes that data
– once the Map tasks have finished, data is then transferred across the network to the Reducers
– Although the Reducers may run on the same physical machines as the Map tasks, there’s no concept of data locality for the Reducers
– All Mappers will, in general, have to communicate with all Reducers

MapReduce: Is Shuffle and Sort a Bottleneck?
– it appears that the shuffle and sort phase is a bottleneck
– the reduce method in the Reducers cannot start until all Mappers have finished
– in practice, Hadoop will start to transfer data from Mappers to Reducers as the Mappers finish work
– this mitigates against a huge amount of data transfer starting as soon as the last Mapper finishes
– note that this behavior is configurable
– the developer can specify the % of Mappers which should finish before Reducers start retrieving data
– the developer’s reduce method still does not start until all intermediate data has been transferred and sorted
MapReduce: Is a Slow Mapper a Bottleneck?
– it is possible for one Map task to run more slowly than the others
– perhaps due to faulty hardware, or just a very slow machine
– it would appear this would create a bottleneck
– the reduce method in the Reducer cannot start until every Mapper has finished
– Hadoop uses speculative execution to mitgate against this
– if a Mapper appears to be running significantly more slowly than the others, a new instance of the Mapper will be started on another machine, operating on the same data
– the results of the first Mapper to finish will be used
– Hadoop will kill off the Mapper which is still running

Creating and Running a MapReduce Job
– write a Mapper and Reducer classes
– write a Driver class that configures the job and submits it to the cluster
– compile the Mapper, Reducer, and Driver classes
– javac -classpath `hadoop classpath` *.java
– create a jar file with the Mapper, Reducer, and Driver classes
– jar cvf foo.jar *.class
– run the hadoop jar command to submit the job to the Hadoop cluster
– hadoop jar foo.jar Foo in_dir out_dir

## Remove directory in hadoop
$ hadoop fs -rmr wordcounts
## Delete a MapReduce job example
$ mapred job -list
$ mapred job -kill job_201301212044_003

Installing A Hadoop Cluster
– performed by sys admin; Cloudera offers course
– very useful to understand how the component parts of the Hadoop cluster work together
– a developer will configure their machine to run pseudo-distributed mode
* this effectively creates a single-machine cluster
* all 5 Hadoop daemons are running on the same machine
* very useful for testing code before it is deployed to the real cluster
– easiest way to d/l and install Hadoop, either for a full cluster or in pseudo-distributed mode, is by using Cloudera’s Distribution including Apache Hadoop (CDH)
* vanilla Hadoop plus many patches, backports, bugfixes
* supplied as Debian pkg and RPM and as tarball
* full doc http://cloudera.com/

5 Hadoop Daemons
– NameNode – Holds the metadata for HDFS
– Secondary NameNode – performs housekeeping functions for the NameNode; is NOT a backup or hotstandby for the NameNode!
– DataNode – stores actual HDFS data blocks
– JobTracker – manages MapReducejobs, distributes individual tasks to machines running the…
– TaskTracker – instantiantes and monitors individual Map and Reduce tasks

– Each daemon runs in its own JVM
– no node on a real cluster will run all 5 daemons (although technically possible)
– we can consider nodes to be in 2 different categories:
* Master Nodes
– Run the NameNode, Secondary NameNode, JobTracker daemons
– only one of each of these daemons runs on the cluster
* Slave nodes
– Run the DataNode and TaskTracker daemons
* A slave node will run both of these daemons

Basic Cluster Configuration
Master Nodes
(JobTracker) (NameNode) (Secondardy NameNode)
Slave Nodes
(TaskTracker DataNode) (TaskTracker DataNode) … (TaskTracker DataNode)
– on very small cluster, the NameNode, JobTracker and Secondary NameNode daemons can all reside on a single machine
* It is typical to put them on separate machines as the cluster grows beyond 20-30 nodes
– each daemon runs in a separate JVM

Submitting a Job
Client -> XML Package ,jar file -> JobTracker -> (DataNode TaskTraker Task) or more
– the intermediate data is held on the TaskTracker’s local disk
– as Reducers start up, the intermediate data is distributed across the network to the Reducers
– Reducers write their final output to HDFS
– once the job has completed, the TaskTracker can delete the intermediate data from its local disk
* note that the intermediate data is not deleted until the entire job completes

Hive (developed at Facebook)
– abstraction on top of MapReduce
– allows users to query data in the Hadoop cluster without knowing Java or MapReduce
– uses the HiveQL language (similar to SQL)
– Hive interpreter runs on a client machine
* turns HiveQL queries into MapReduce jobs
* submits those jobs to the cluster
– Note: this does not turn the cluster into a relational database server!
* it is still simply running MapReduce jobs
* those jobs are created by the Hive Interpreter
– Sample Hive query:
SELECT stock.product, SUM(orders.purchases)
FROM stock JOIN orders
ON (stock.id = orders.stock_id)
WHERE oders.quarter = ‘Q1’
GROUP BY stock.product;

Pig (developed at Yahoo)
– alternative abstraction on top of MapReduce
– uses dataflow scripting language, called PigLatin
– Pig interpreter runs on the client machine
* takes the PigLatin script and turns it into a series of MapReduce jobs
* submits those jobs to the cluster
– as with Hive, nothing ‘magical’ happens on the cluster, simply running MapReduce jobs
– Sample Pig script:
stock = LOAD ‘/user/fred/stock’ AS (id, item);
orders = LOAD ‘/user/fred/orders’ AS (id, cost);
grpd = GROUP orders BY id;
totals = FOREACH grpd GENERATE group,
SUM(orders.cost) AS t;
result = JOIN stock BY id, totals BY group;
DUMP result;

– open source project created by Cloudera
– facilitates real-time queries of data in HDFS
– does not use MapReduce
* uses its own daemon, running on each slave node
* queries data stored in HDFS
– uses a language very similar to HiveQL
* but produces results much, much faster (5-40x faster than Hive)
– currently in beta

Flume and Sqoop
– Flume provides a method to import data into HDFS as it is generated
* instead of batch-processing the data later
* for ex: log files from a Web server
– Sqoop provides a method to import data from tables in a relational database into HDFS
* does this very efficiently via a Map-only MapReduce job
* can also ‘go the other way’
– populate database tables from files in HDFS

– allows developers to create a workflow of MapReduce jobs, including dependencies between jobs
– oozie server submits the jobs to the server in the correct sequence

HBase (modeled after Google’s Big Table)
– HBase is ‘the Hadoop database’
– a ‘NoSQL’ datastore
– can store massive amounts of data
– GB, TB, PB of data in a table
– scales to provide very high write throughput
* hundreds of thousands of inserts per second
– copes well with sparse data
* tables can have man thousands of columns, even if most columns are empty for any given row
– has a very constrained access model
– insert a row, retrieve a row, do a full or partial table scan
– only one column (the ‘row key’) is indexed

Common MapReduce Algorithms
– MapReduce jobs tend to be relatively short in terms of lines of code
– typical to combine multiple small MapReduce jobs together in a single workflow, often using oozie
– many MapReduce jobs use similar code
– Sorting and Searching large data sets
* very well suited to sorted large data sets
* recall: keys are passed to the Reducer in sorted order
* Asuuming the file to be sorted contains lines with a single value:
– Mapper is merely the identity function for the value
(k, v) -> (v, _)
map(0, “zebra”) -> emit(“zebra”, _)
map(5, “ant” -> emit(“ant”, _)
– Reducer is the identity function
(k, _) -> (k, ”)
reduce(“ant”, [ _ ]) -> emit(“ant”, ”)
reduce(“zebra”, [ _ ]) -> emit(“zebra”, ”)
– trvial with a single Reducer
– If multiple Reducers, then output will probably look like this
* Reducer 1: aardvark, bee, cobra, zebra
* Reducer 2: ant, bear, cat, yak
– Solution: Write a custom partitioner
# simple example (pseudo code)
if key<“c” then reducer=1
else reducer=2
– output:
* Reducer 1: aardvark, ant, bear, bee
* Reducer 2: cat, cobra, yak, zebra
– sorting is frequently used as a speed test for a Hadoop cluster
* Mapper and Reducer are trivial
– Therefore, sorting is effectively testing the Hadoop framework’s I/O
– good way to measure the increase in performance if you enlarge your cluster
* run and time a sort job before and after you add more nodes
* terasoft is one of the sample jobs provided with Hadoop
– creates and sorts very large files
– /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar terasort
– Searching
– assume input is set of files containing lines of text
– assume Mapper has been passed the pattern for which to search as a special parameter
* we saw how to pass parameters to your Mapper previously
– algorithm
* Mapper compares the line against the pattern
* if the pattern matches, Mapper outputs (line, _)
– Or (filename+line, _), or …
* if the pattern does not match, Mapper outputs nothing
* Reducer is the Identity Reducer
– just outputs each intermediate key
– see also /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar grep
Secondary Sort: Motivation
– keys are passed to the Reducer in sorted order
– list of values for a particular key is not sorted; order may well change between different runs of the MapReduce job
– somtimes a job needs to receive the values for a particular key in sorted order, this is secondary sort
– Ex: your reducer will emit the largest value produced by Mappers for each different key
– naive solution (in Reducer, loop through all values, keeping track of the largest, finally emit largest value)
– Better solution
* arrange for the values in a given key to be presented to the Reducer in sorted, descending order
* Reducer just needs to read and emit the first value it is given for a key
– Ex:
* No secondary sort
– reduce(“a”, [16, 9, 17, 2])
* Secondary Sort (descending)
– reduce(“a”, [17, 16, 9, 2])
Aside: Comparator Classes
– Comparator classes are classes that compare objects
– custom comparators can be used in a secondary sort to compare composite keys
* Before: a#16, a#2, a#17, a#9
* After: a#17, a#16, a#9, a#2
– Grouping comparators can be used in a secondary sort to ensure that only the natural key is used for partitioning and grouping
* Before:reduce(a#16, [16])
* reduce(a#2, [2])
* reduce(a#17, [17])
* reduce(a#9, [9])
* After:reduce(a#17, [17, 16, 9, 2])
Implementing the Secondary Sort
– to implement, the intermediate key should be a composite of the ‘actual’ (natural) key and the value
– define a Partitioner which partitions just on the natural key
– define a comparator class which sorts on the entire composite key
* ensures that the keys are passed to the Reducer in the desired order
* Orders by natural key and, for the same natural key, on the value portion of the key
* specified in the driver code by: job.setSortComparatorClass (MyOKCC.class);
– now we know that all the values for the same natural key will go to the same Reducer
* and they will be in the order we desire
– we must now ensure that all the values for the same natural key are passed in one call to the Reducer
– achieved by defining a Grouping Comparator class
* determines which keys and values are passed in a single call to the Reducer
* looks at just the natural key
* specified in the driver code by: job.setGroupingComparatorClass (MyOVGC.class);
Ex: Assume we have input with (key, value) pairs like this:
a 17
b 12
f 4
f 5
b 8
f 22
a 16
– We want Reducer to recieve the intermediate data for each key in descending numerical order
– Write the Mapper such that the intermediate key is a composite of the natural key and value
* Ex. intermediate output may look like this:
(‘a#17’, 17)
(‘b#12’, 12)
(‘f#4’, 4)
(‘f#5’, 5)
(‘b#8’, 8)
(‘f#22’, 22)
(‘a#16’, 16)
– write a class that extends WritableComparator and sorts on natural key, and for identical natural keys, sorts on the value portion in desc order
* just override compare (WritableComparable, WritableComparable)
* supply a reference to this class in your driver using the Job.setOutputKeyComparatorClass method
* will result in keys being passed to the Reducer in this order
(‘a#17’, 17)
(‘b#16’, 16)
(‘b#8’, 8)
(‘f#22’, 22)
(‘f#5’, 5)
(‘f#4’, 4)
– Finally, write another WritableComparator subclass which just examines the first(‘natural’) portion of the key
* again, just override compare (WritableComparable, WritableComparable)
* supply a reference to this class in your driver using the Job.setOutputValueGroupingComparator method
– this will ensure that values associated with the same natural key will be sent to the same pass of the Reducer
* but they’re sorted in descending order, as we required
* reduce(‘a#17’, [17, 16])
* reduce(‘b#12’, [12, 8])
* reduce(‘f#22′, [22, 5, 4])

– assume the input is a set of files containing lines of text
– key is the byte offset of the line, value is the line itself
– retrieve the name of the file using Context object
Inverted Index Algorithm
– Mapper (for each word in the line, emit (word, filename)
– Reducer
* identity function
– collect together all values for a given key (i.e. all filenames for a particular word)
– emit (word, filename_list)
Inverted Index: Dataflow
Aside: Word Count
– recall the WordCount example we used earlier
* For each word, mapper emitted (word, 1)
* very similar to inverted index
– common theme: reuse of existing Mappers, with minor modifications

Term Frequency – Inverse Document Frequency
– answers question, “How important is this term in a document?”
– known as a term weighting function (assigns a score(weight) to each term (word) in a document)
– very commonly used in text processing and search
– has many applications in data mining
TF-IDF: Motivation
– merely counting the # of occurrences of a word in a document is not a good enough measure of its relevance
* if word appears in many other docs, it is probably less relevant
* some words appear too frequently in all docs to be relevant, know as “stopwords’
– considers both freq of a word in a given doc and the number of docs which contain the work
TF-IDF: Data Mining Example
– consider a music recommendation service
* given many users’ music libraries, provide “you may also like” suggestions
– if user A and user B have similar libraries, user A may like an artist in user B’s library
* but some artists will appear in almost everyone’s library, and should therefore be ignored when making recommendations
– almost everyone has The Beatles in their record collection!
TF-IDF Formally Defined
– Term Frequency (TF) – # of times a term appears in a document (i.e. the count)
– Inverse Dcoument Frequency (IDF)
idf = log (N/n); N: total number of documents, n: # of docs that contain a term
– What we need
* # of times t appears in a doc (different value for each doc)
* # of docs that contains t (one value for each term)
* total number of docs (one value)
Computing TF-IDF with MapReduce
– overview of algorithm: 3 MapReduce jobs
– job1: computer term frequencies
– job2: compute number of docs each word occurs in
– job3: compute TF-IDF
– notation in following slides:
– docid = a unique ID for each doc
– contents = the complete text of each doc
– N = total # of docs
– term = a term (word) found in the doc
– tf = term frequency
– n = # of docs a term appears in
– note that real-world systems typically perform ‘stemming’ on terms
* removal of plurals, tense, possessives, etc.
– Job1 – compute tf
– Mapper
* input: (docid, contents)
* for each term in the doc, generate a (term, docid) pair
– i.e. we have seen this term in this doc once
* ouput:((term, docid), 1)
– Reducer
* sums counts for word in doc
* outputs ((term, docid), tf)
– i.e. the term frequency of term in docid is tf
– We can add a Combiner, which will use the same code as the Reducer
– Job2 – compute n
– Mapper
* input: ((term,docid), tf)
* output: (term, (docid, tf, 1))
– Reducer
* sums 1s to compute n (# of docs containing them)
* Note: need to buffer (docid, tf) pairs while we are doing this
* Ouputs ((term, docid), (tf, n))
Job 3 – compute TF-IDF
– Mapper
* input: ((term, docid), (tf, n))
* assume N is known (easy to find)
* output ((term, docid), TFxIDF)
– Reducer
* the identity function

Major components of an HBASE cluster:
– zookeeper (centralized service used to maintain config info for Hbase)
– catalog tables (keep track of the locations of region servers and regions)
– master (monitors all region server instances in the cluster; interface for all metadata changes)
– region server (responsible for serving and managing regions)
– region (a set of rows belonging to a table, 256MB default size; 1-4GB recommended)

Categories: Uncategorized
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: