Home > Hadoop > Introduction to Hadoop notes

Introduction to Hadoop notes

Hadoop – distributed storage/compute system

2 components
HDFS – storage
MapReduce – compute

data is split into 64MB or 128MB

Traditional RDBMS, schema on write; can’t handle unstructured data; reads fast; structured creates governance

Hadoop – Schema on read; any kind of data file

Flexibility, Scalability, Economics (keeping more data online)

Volume (amount of data), Velocity (growth of data), variety (different sources of data)

Open Time Series Database

Multiples of:
1 HD + 2 cores + 6-8GB RAM tend to work well

Slave Nodes:
Hyperthreading should be enabled
Disk/network i/o are typical bottlenecks
Slave node config specifies max # of Map and Reduce tasks that can run simultaneously on that node
Each Map or Reduce task will take 1GB to 2GB of RAM
Should not use virtual memory
Ensure enough RAM to run all tasks, plus overhead for DataNode and TaskTracker daemons, plus OS
Rule of thumb for memory: total # of tasks = 1.5 x # logical processor cores
More spindles is better
4-12 disks per node
Use 3.5″ disks (faster, cheaper, higher capacity than 2.5″)
7200 RPM SATA drives are fine
8 x 1.5TB is better than 6 x 2TB
Practical max is 24TB per slave node (> will result in massive network traffic if node dies, block re-replication take place
No benefit from RAID
HDFS provides built-in redundancy
RAID striping (RAID0) slower than the JBOD config by HDFS; limited by slowest disk in RAID array
Disk ops on JBOD is independent; avg speed is > than the slowest disk
Virtualization is not worth considering
Blade servers not recommended

Master Nodes:
Single point of failure
If NameNode goes down, cluster is inaccessible
If JobTracker goes down, no jobs can run on cluster. Currently running jobs will fail.
Carrier-class h/w (dual power supplies, dual ethernet cards, RAIDed HDs, RAM >32GB)

Network considerations:
Hadoop is bw intensive; data and NameNodes send heartbeats every 3s
Use dedicated switches for cluster
Nodes connected at top of rack switch, min 1Gb/s
If large amts of intermediate data is generated, consider 10Gb/s (alternative is bond 2x 1Gb/s interfaces)

Node configuraton:
CentOS, RHEL; RHEL on master nodes, CentOS on slaves
Do not use Linux LVM
Check BIOS settings (e.g. SATA drives should not have IDE emulation enabled)
Test disk i/o speed with “hdparm -t /dev/sda1”; Should see >70MB/s or indicates problems
Hadoop has no specific disk partitioning reqts
Mount disks with noatime option
Common directory structure for data mount points (4 disk example):
– /data/1/dfs/nn
– /data/2/dfs/dn
– /data/3/dfs/snn
– /data/4/mapred/loc
Set vm.swappiness to 0 or 5 in sysctl.conf
Cloudera recommends ext3/ext4
XFS provides some performance benefit during kickstart (formats in 0s)
XFS has some perf issues (slow deletes, problems when machine runs out of memory in some versions)
Increase nofile ulimit for mapred, hdfs users to 32k in limits.conf
Disable IPv6
Disable SELinux
Install NTP
Java JDk 1.6.0u24 or 1.6.0u26
http://hadoop.apache.org (Vanilla base)
Or Cloudera’s Distribution including Apache Hadoop (CDH) starts with the latest stable Hadoop distribution
– interoperability, patches, bug fixes, more tools
– CDH does integration testing, hadoop version may lag a bit to Apache as it more testing

Hadoop concepts
– Open source project by Apache Software Foundation
– Originally bes on papers published by Google in 2003, 2004
– 2 core components (HDFS, MapReduce)
– Other projects based around core Hadoop (Pig, Hive, HBase, Flume, Oozie, Sqoop, etc.)
– Hadoop cluster (set of machines running HDFS, MapReduce)
– individual machines are known as nodes
– cluster can have 1 or more nodes

HDFS (written in Java)
– storing data
– data is split into blocks and distributed across multiple nodes in the cluster (block is typically 64MB or 128MB)
– each block is replicated many times (default = 3x, replicas are stored on different nodes)
– provides durability, fault tolerance, data throughput
– sits on top of native fs such as ext3/4, or xfs
– provides redundant storage for massive amounts of data
– performs best with a ‘modest’ number of files (millions rather than billions; each file is 100MB or more)
– files in HDFS are ‘write once’; no random writes to files are allowed
– optimized for large, streaming reads of files rather than random reads
– blocks are stored as standard files on the DataNodes in a set of directories specified in Hadoop’s config files

– system used to process data in the Hadoop cluster
– 2 phases (Map, then Reduce); between the 2 phases is stage known as “shuffle and sort”
– each Map task operates on a discrete portion of the overall dataset, typically 1 HDFS block of data
– after all Maps are complete, the MapReduce system distributes the intermediate data to nodes which perform the reduce phase

How files are stored
– Each block is 64 or 128MB
– Data is distributed across many machines at load time
– Blocks are replicated across multiple machines, known as DataNodes
– Master node is called the NameNode which keeps track of which blocks make up a file and where these blocks are located (known as metadata)

HDFS NameNode
– NameNode daemon must be running at all times or else cluster will be inaccessible
– NameNode holds all of its metadata in RAM for fast access; keeps record of changes on disk for crash recovery
– separate Secondary NameNode takes care of housekeeping tasks for NameNode, but is NOT a backup to NameNode!

NameNode High Availability in CDH4
– CDH4 introduced HA for the NameNode
– Instead of single NameNode, there are now 2 (active, standby)
– If Active fails, Standby NameNode can automatically take over
– Standby NameNode does the work performed by the Secondary NameNode in ‘classic’ HDFS
– HA HDFS does not run a Secondary NameNode daemon
– your sysadmin will choose whether to set up cluster with NameNode HA or not

When a client app reads a file:
– it communicates with NameNode to determine which blocks make up file and which DataNodes those blocks reside on
– it then commmunicates directly with the DataNodes to read the data
– NameNode will not be a bottleneck

Accessing HDFS
– applications can read/write HDFS files directly via Java API
– typically, files are created on local filesystem and must be moved into HDFS
– files stored in HDFS may need to be moved to a machine’s local filesystem
– access to HDFS from the cmd line is achieved through “hadoop fs” cmd

Hadoop fs Examples (copyFromLocal is synonym for put; copyToLocal is synonym for get)
# Copy file foo.txt from local disk to user’s dir in HDFS (copy file to /user/username/foo.txt)
hadoop fs -put foo.txt foo.txt
# Get a dir listing of user’s home dir in HDFS
hadoop fs -ls
# Get a dir listing of HDFS root dir
hadoop fs -ls /
# Display contents of HDFS file /user/fred/bar.txt
hadoop fs -cat /user/fred/bar.txt
# Move that file to local disk, named baz.txt
hadoop fs -get /user/fred/bar.txt baz.txt
# Create dir called “input” under the user’s home dir
hadoop fs -mkdir input
# Delete dir “input_old” and all of its contents
hadoop fs -rm -r input_old

How MapReduce works
– distributing a task across multiple nodes
– each node processes data stored on that node, where possible
– consists of 2 phases (Map, Reduce)

Features of MapReduce
– automatic parallelization and distribution
– fault-tolerance
– status and monitoring tools
– clean abstraction for programmers (programs usually wirtten in Java, but can use others through Hadoop Streaming)
– abstracts all of the ‘housekeeping’ away from the developer; developer can concentracte simply on writing the Map and Reduce functions

MapReduce: JobTracker
– jobs are controlled by JobTracker
– Jobtracker resides on ‘master node’
– clients submit MapReduce jobs to JobTracker
– JobTracker assigns Map and Reduce tasks to other nodes on the cluster
– these nodes run a software daemon known as TaskTracker
– TaskTracker is responsible for actually instantiating the Map or Reduce task, and reporting progress back to JobTracker

Aside: MapReduce v2
– CDH4 contains ‘standard’ MapReduce (MR1); also includes MR2, also known as YARN (Yet Another Resource Negotiator)
– MR2 is not yet considered Prod ready
– Existing code will work with no mod on MR2 clusters when technology matures; code will need to recompiled, but API same
– for Prod, recommend using MR1

MapReduce: Terminology
– job, a complete execution of Mappers and Reducers over a dataset
– task, execution of a single Mapper or Reducer over a slice of data
– task attempt, particular instance of an attempt to execute a task
– there will be at least as many task attempts as there are tasks
– if a task attempt fails, another will be started by JobTracker
– Speculative execution can also result in more task attempts than completed tasks

MapReduce: The Mapper
– Hadoop attempts to ensure that Mappers run on nodes which hold their portion of the data locally, to avoid network traffic
– Multiple Mappers run in parallel, each processing a portion of the input data
– Mapper reads data in the form of key/value pairs
– outputs zero or more key/value pairs (pseudo-code): map(in_key, in_value) -> (inter_key, inter_value) list
– Mapper may use or completely ignore the input key
– std pattern is to read a line of a file at a time
– key is the byte offset into the file at which the line starts
– value is the contents of the line itself
– typically the key is considered irrelevant
– If Mapper writes anything out, output must be in the form of key/value pairs

Example Mapper: Upper Case Mapper
– Turn input into upper case (pseudo-code): let map(k, v) = emit(k.toUpper(), v.toUpper())
– (‘foo’, ‘bar’) -> (‘FOO’, ‘BAR’)

Example Mapper: Explode Mapper
– output each input character separately (psuedo-code):
let map(k, v) = foreach char c in v: emit (k, c)
– (‘foo’, ‘bar’) -> (‘foo’, ‘b’), (‘foo’, ‘a’), (‘foo’, ‘r’)
– (‘baz’, ‘other’) -> (‘baz’, ‘o’), (‘baz’, ‘t’), (‘baz’, ‘h’), (‘baz’, ‘e’), (‘baz’, ‘r’)

Example Mapper: Filter Mapper
– only output key/value pairs where the input value is a prime number (psuedo-code):
let map(k, v) = if (isPrime(v)) then emit (k, v)
– (‘foo’, 7) -> (‘foo’, 7)
– (‘foo’, 10) -> nothing
Example Mapper: Changing Keyspaces
– key output by the Mapper does not need to be identical to the input key
– output the word length as the key (pseudo-code):
let map(k, v) = emit(v.length(), v)
– (‘foo’, ‘bar’) -> (3, ‘bar’)
– (‘baz’, ‘other’) -> (5, ‘other’)

MapReduce: The Reducer
– After the Map phase is over, all the intermediate values for a given intermediate key are combined together into a list
– This list is given to a Reducer
– There may be a single Reducer or multiple Reducers
– This is specified as part of the job configuration (see later)
– All values associated with a particular intermediate key are guaranteed to go to the same Reducer
– The intermediate keys, and their value lists, are passed to the Reducer in sorted key order
– This step is known as the ‘shuffle and sort’
– The Reducer outputs zero or more final key/value pairs
– These are written to HDFS
– In practice, the Reducer usually emits a single key/value pair for each input key

Example Mapper: Sum Reducer
– Add up all the values associated with each intermediate key (pseudo-code):
let reduce(k, vals) =
sum = 0
foreach int i in vals:
sum += i
emit(k, sum)
– (‘bar’, [9, 3, -17, 44]) -> (‘bar’, 39)

Example Mapper: Identity Reducer
let reduce(k, vals) =
foreach v in vals:
emit(k, v)
– (‘bar’, [123, 100, 77]) -> (‘bar’, 123), (‘bar’, 100), (‘bar’, 77)

MapReduce Example: Word Count
– count the number of occurrences of each word in a large amount of input data
– This is the ‘hello world’ of MapReduce programming
map(String input_key, String input_value)
foreach word w in input_value:
emit(w, 1)
reduce(String output_key, Iterator<int> intermediate_vals)
set count = 0
foreach v in intermediate_vals:
count += v
emit(output_key, count)
– input to the Mapper (3414 and 3437 are byte offsets, don’t care about these values)
(3414, ‘the cat sat on the mat’)
(3437, ‘the aardvark sat on the sofa’)
– output from the Mapper:
(‘the’, 1), (‘cat’, 1), (‘sat’, 1), (‘on’, 1), (‘the’, 1), (‘mat’, 1), (‘the’, 1), (‘aardvark’, 1), (‘sat’, 1), (‘on’, 1), (‘the’, 1), (‘sofa’, 1)
– intermediate data sent to the Reducer:
(‘aardvark’, [1])
(‘cat’, [1])
(‘mat’, [1])
(‘on’, [1, 1])
(‘sat’, [1, 1])
(‘sofa’, [1])
(‘the’, [1, 1, 1, 1])
– final Reducer output
(‘aardvark’, 1)
(‘cat’, 1)
(‘mat’, 1)
(‘on’, 2)
(‘sat’, 2)
(‘cat’, 1)
(‘sofa’, 1)
(‘the’, 4)

MapReduce: Data Locality
– Whenever possible, Hadoop will attempt to ensure that a Map task on a node is working on a block of data stored locally on that node via HDFS
– If this is not possible, the Map task will have to transfer the data across the network as it processes that data
– once the Map tasks have finished, data is then transferred across the network to the Reducers
– Although the Reducers may run on the same physical machines as the Map tasks, there’s no concept of data locality for the Reducers
– All Mappers will, in general, have to communicate with all Reducers

MapReduce: Is Shuffle and Sort a Bottleneck?
– it appears that the shuffle and sort phase is a bottleneck
– the reduce method in the Reducers cannot start until all Mappers have finished
– in practice, Hadoop will start to transfer data from Mappers to Reducers as the Mappers finish work
– this mitigates against a huge amount of data transfer starting as soon as the last Mapper finishes
– note that this behavior is configurable
– the developer can specify the % of Mappers which should finish before Reducers start retrieving data
– the developer’s reduce method still does not start until all intermediate data has been transferred and sorted
MapReduce: Is a Slow Mapper a Bottleneck?
– it is possible for one Map task to run more slowly than the others
– perhaps due to faulty hardware, or just a very slow machine
– it would appear this would create a bottleneck
– the reduce method in the Reducer cannot start until every Mapper has finished
– Hadoop uses speculative execution to mitgate against this
– if a Mapper appears to be running significantly more slowly than the others, a new instance of the Mapper will be started on another machine, operating on the same data
– the results of the first Mapper to finish will be used
– Hadoop will kill off the Mapper which is still running

Creating and Running a MapReduce Job
– write a Mapper and Reducer classes
– write a Driver class that configures the job and submits it to the cluster
– compile the Mapper, Reducer, and Driver classes
– javac -classpath `hadoop classpath` *.java
– create a jar file with the Mapper, Reducer, and Driver classes
– jar cvf foo.jar *.class
– run the hadoop jar command to submit the job to the Hadoop cluster
– hadoop jar foo.jar Foo in_dir out_dir

Categories: Hadoop
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s

%d bloggers like this: