BigData:
•Big data is a term used to describe the voluminous amount of unstructuredand semi-structured data a company creates.
•Data that would take too much time and cost too much money
to load intoa relational database for analysis.
• Big data doesnt refer to any
specific quantity, the term is often used whenspeaking about petabytes and
exabytes of data.
What
Caused The Problem? Standard Hard Drive SizeYear (in Mb) Data Transfer Rate
Year (Mbps) 1990 1370 1990 4.4 2010 1000000 2010 100
Increase
in processing time may not be as helpful because
• Network
bandwidth is now more of a limiting factor
• Physical
limits of processor chips have been reached
Time to read entire disk= 10000 seconds or 3 Hours! A standard disk is 1 Terabyte The transfer speed is around 100 MB/s So What
Is The Problem?
So What do We Do?
•The
obvious solution is that we use multiple processors to solve the same problem
by fragmenting it into pieces.
•Imagine
if we had 100 drives, each holding one hundredth of the data. Working in
parallel, we could read the data in under two minutes.
Distributed Computing-
Multiple
computers connected via a network
Parallelization- Multiple processors or CPU’s in a
single machine Distributed Computing VsParallelization
Network
Associated Problems Combine the data
after analysis Hardware failure Distributed
ComputingThe key issues involved in this Solution:
Simulating
an internet size network for network experiments Index the Web (Google) Simulating several 100’s of characters- LOTRs Multiplying Large Matrices IBM Deep Blue What Can We Do With A
DistributedComputer System?
Problems
In Distributed Computing
• Hardware Failure:
As soon as
we start using many pieces of hardware, the chance that one will fail is fairly
high.
Most
analysis tasks need to be able to combine the data in some way; data read from
one disk may need to be combined with the data from any of the other 99 disks.
To The Rescue!Apache
Hadoop is a framework for running applications onlarge cluster built of
commodity hardware.A common way of avoiding data loss is through
replication:redundant copies of the data are kept by the system so that in
theevent of failure, there is another copy available. The HadoopDistributed
Filesystem (HDFS), takes care of this problem.The second problem is solved by a
simple programming model-Mapreduce. Hadoop is the popular open source
implementationof MapReduce, a powerful tool designed for deep analysis
andtransformation of very large data sets.
A reliable
shared storage and analysis system. There are other subprojects of Hadoop that
provide complementary services, or build on the core to add higher-level
abstractions The various subprojects of hadoop include:
Hadoop
provides a simplified programming model which allows the user to quickly write
and test distributed systems, and its’ efficient, automatic distribution of
data and work across machines and in turn utilizing the underlying parallelism
of the CPU cores. Hadoop will tie these
smaller and more reasonably priced machines together into a single
cost-effective compute cluster. The
theoretical 1000-CPU machine would cost a very large amount of money, far more
than 1,000 single-CPU. Hadoop Approach to DistributedComputing
MapReduce
The other
workers continue to operate as though nothing
By restricting the communication between nodes, Hadoop makes the
distributed system much more reliable. Individual node failures can be worked
around by restarting tasks on other machines.
Hadoop limits the amount of communication which can be performed by the
processes, as each individual record is processed by a task in isolation from
one another MapReduce (out_value list)à(out_key,
intermediate_value)Reduce: (out_key, intermediate_value)àwent
wrong, leaving the challenging aspects of partially restarting the program to
the underlying Hadoop layer.Map : (in_value,in_key)
MapReduce
is an associated implementation for processing and generating large data sets. Programs written in this functional style are
automatically parallelized and executed on a large cluster of commodity
machines MapReduce is a programming
model What is MapReduce?
Map,
written by the user, takes an input pair and produces a set of intermediate
key/value pairs. The MapReduce library groups together all intermediate values
associated with the same intermediate key I and passes them to the Reduce
function.
The
Programming Model Of MapReduce
Emit(AsString(result));Ø
result += ParseInt(v);Ø
for each v in values:Ø
int result = 0;Ø
// values: a list of countsØ
// key: a wordØ reduce(String key, Iterator
values):Ø EmitIntermediate(w, "1");Ø
for each
word w in value:Ø
// value: document contentsØ
// key: document nameØ
Example: This abstraction allows us to handle lists of
values that are too large to fit in memory.
Orientation of Nodes Data Locality
Optimization:
The
computer nodes and the storage nodes are the same. The Map-Reduceframework and
the Distributed File System run on the same set of nodes. Thisconfiguration
allows the framework to effectively schedule tasks on the nodes wheredata is
already present, resulting in very high aggregate bandwidth across
thecluster.If this is not possible: The computation is done by another
processor on the samerack. “Moving Computation is Cheaper than Moving Data”
A MapReduce job is a unit of work
that the client wants to be performed:
it
consists of the input data, the MapReduce program, and configuration
information. Hadoop runs the job by dividing it into tasks, of which there are
two types: map tasks and reduce tasks Typically
both the input and the output of the job are stored in a file-system. The
framework takes care of scheduling tasks, monitoring them and re-executes the
failed tasks. The framework sorts the
outputs of the maps, which are then input to the reduce tasks. A Map-Reduce job usually splits the input
data-set into independent chunks which are processed by the map tasks in a
completely parallel manner. How MapReduce Works
If a tasks
fails, the jobtracker can reschedule it on a different tasktracker. Tasktrackers run tasks and send progress reports
to the jobtracker, which keeps a record of the overall progress of each job. The jobtracker coordinates all the jobs run
on the system by scheduling tasks to run on tasktrackers. There are two types of nodes that control the
job execution process: tasktrackers and jobtrackers
Fault Tolerance
Map tasks
write their output to local disk, not to HDFS. Map output is intermediate
output: it’s processed by reduce tasks to produce the final output, and once
the job is complete the map output can be thrown away. So storing it in HDFS,
with replication, would be a waste of time. It is also possible that the node
running the map task fails before the map output has been consumed by the
reduce task. BUT if splits are too
small, then the overhead of managing the splits and of map task creation begins
to dominate the total job execution time. For most jobs, a good split size
tends to be the size of a HDFS block, 64 MB by default.WHY? The quality of the load balancing increases
as the splits become more fine- grained.
Input splits: Hadoop divides the input to a MapReduce job into
fixed-size pieces called input splits, or just splits. Hadoop creates one map
task for each split, which runs the user-defined map function for each record
in the split. Input Splits
Reduce
tasks don’t have the advantage of data locality—the input to a single reduce
task is normally the output from all mappers. Input to Reduce Tasks
MapReduce
data flow with a single reduce task
MapReduce
data flow with multiple reduce tasks
MapReduce data flow with no reduce tasks
Combiner Functions
•Many
MapReduce jobs are limited by the bandwidth available on the cluster.
•In order
to minimize the data transferred between the map and reduce tasks,
combinerfunctions are introduced.
•Hadoop
allows the user to specify a combiner function to be run on the map
output—thecombiner function’s output forms the input to the reduce function.
•Combiner
finctions can help cut down the amount of data shuffled between the maps andthe
reduces.
Hadoop Streaming:
•Hadoop
provides an API to MapReduce that allows you to write your map and reduce
functions in languages other than Java.
•Hadoop
Streaming uses Unix standard streams as the interface between Hadoop and your
program, so you can use any language that can read standard input and write to
standard output to write your MapReduce program.
Hadoop Pipes:
•Hadoop
Pipes is the name of the C++ interface to Hadoop MapReduce.
•Unlike
Streaming, which uses standard input and output to communicate withthe map and
reduce code, Pipes uses sockets as the channel over which thetasktracker
communicates with the process running the C++ map or reducefunction. JNI is not
used.
HDFS, the
Hadoop Distributed File System, is a distributed file system designed to hold
very large amounts of data (terabytes or even petabytes), and provide
high-throughput access to this information.
Hadoop comes with a distributed filesystem called HDFS, which stands for
Hadoop Distributed Filesystem. Filesystems
that manage the storage across a network of machines are called distributed
filesystems. HADOOP DISTRIBUTEDFILESYSTEM (HDFS)
Problems
In Distributed File SystemsMaking distributed filesystems is more complex than
regular disk filesystems. Thisis because the data is spanned over multiple
nodes, so all the complications ofnetwork programming kick in.
•Hardware
FailureAn HDFS instance may consist of hundreds or thousands of server
machines, each storingpart of the file system’s data. The fact that there are a
huge number of components and thateach component has a non-trivial probability
of failure means that some component of HDFSis always non-functional.
Therefore, detection of faults and quick, automatic recovery fromthem is a core
architectural goal of HDFS.
•Large Data Sets Applications that run on HDFS have large data sets. A typical file in HDFS is gigabytes toterabytes in size. Thus, HDFS is tuned to support large files. It should provide highaggregate data bandwidth and scale to hundreds of nodes in a single cluster. It shouldsupport tens of millions of files in a single instance.
Goals of
HDFS Streaming Data Access Applications that run on HDFS need streaming access
to their data sets. They arenot general purpose applications that typically run
on general purpose file systems.HDFS is designed more for batch processing
rather than interactive use by users.The emphasis is on high throughput of data
access rather than low latency of dataaccess. POSIX imposes many hard
requirements that are not needed forapplications that are targeted for HDFS.
POSIX semantics in a few key areas hasbeen traded to increase data throughput
rates. Simple Coherency Model HDFS applications need a write-once-read-many
access model for files. A fileonce created, written, and closed need not be
changed. This assumption simplifiesdata coherency issues and enables high
throughput data access. A Map/Reduceapplication or a web crawler application
fits perfectly with this model. There is a planto support appending-writes to
files in the future.
A
computation requested by an application is much more efficient if it is
executed near the data it operates on. This is especially true when the size of
the data set is huge. This minimizes network congestion and increases the
overall throughput of the system. The assumption is that it is often better to
migrate the computation closer to where the data is located rather than moving
the data to where the application is running. HDFS provides interfaces for
applications to move themselves closer to where the data is located.
Portability Across Heterogeneous Hardware and Software Platforms HDFS has been designed
to be easily portable from one platform to another. This facilitates widespread
adoption of HDFS as a platform of choice for a large set of applications. “Moving
Computation is Cheaper than Moving Data”
Streaming
data access HDFS is built around the idea that the most efficient data
processing pattern is a write-once, read-many-times pattern. A dataset is
typically generated or copied from source, then various analyses are performed
on that dataset over time. Each analysis will involve a large proportion of the
dataset, so the time to read the whole dataset is more important than the
latency in reading the first record. Very
large files Files that are hundreds of megabytes, gigabytes, or terabytes in
size. There are Hadoop clusters running today that store petabytes of data. Design
of HDFS
Multiple
writers, arbitrary file modifications Files in HDFS may be written to by a
single writer. Writes are always made at the end of the file. There is no
support for multiple writers, or for modifications at arbitrary offsets in the
file. (These might be supported in the future, but they are likely to be
relatively inefficient.) Low-latency
data access Applications that require low-latency access to data, in the tens
of milliseconds range, will not work well with HDFS. Remember HDFS is optimized
for delivering a high throughput of data, and this may be at the expense of
latency. HBase (Chapter 12) is currently a better choice for low-latency
access.
Lots of
small files Since the namenode holds filesystem metadata in memory, the limit
to the number of files in a filesystem is governed by the amount of memory on
the namenode. As a rule of thumb, each file, directory, and block takes about
150 bytes. So, for example, if you had one million files, each taking one
block, you would need at least 300 MB of memory. While storing millions of
files is feasible, billions is beyond the capability of current hardware.
Commodity
hardware Hadoop doesn’t require expensive, highly reliable hardware to run on.
It’s designed to run on clusters of commodity hardware for which the chance of
node failure across the cluster is high, at least for large clusters. HDFS is
designed to carry on working without a noticeable interruption to the user in
the face of such failure. It is also worth examining the applications for which
using HDFS does not work so well. While this may change in the future, these
are areas where HDFS is not a good fit today:
Concepts
of HDFS:
Blocks:
• A block
is the minimum amount of data that can be read or written.• 64 MB by default.
• Files in
HDFS are broken into block-sized chunks, which are stored as independent units.
• HDFS
blocks are large compared to disk blocks, and the reason is to minimize the
cost of seeks. By making a block large enough, the time to transfer the data
from the disk can be made to be significantly larger than the time to seek to
the start of the block. Thus the time to transfer a large file made of multiple
blocks operates at the disk transfer rate. Block Abstraction
Be Blocks
provide fault tolerance and availability. To insure against corrupted blocks
and disk and machine failure, each block is replicated to a small number of
physically separate machines (typically three). If a block becomes unavailable,
a copy can be read from another location in a way that is transparent to the
client. Making the unit of abstraction a
block rather than a file simplifies the storage subsystem. A file can be larger than any single disk in
the network. There’s nothing that requires the blocks from a file to be stored
on the same disk, so they can take advantage of any of the disks in the
cluster. nefits of Block Abstraction
Hadoop
Archives can be used as input to MapReduce.
Hadoop Archives, or HAR files, are a file archiving facility that packs
files into HDFS blocks more efficiently, thereby reducing namenode memory usage
while still allowing transparent access to files. HDFS stores small files inefficiently, since
each file is stored in a block, and block metadata is held in memory by the
namenode. Thus, a large number of small files can eat up a lot of memory on the
namenode. Hadoop Archives
Archives
are immutable once they have been created. To add or remove files, you must
recreate the archive There is currently
no support for archive compression, although the files that go into the archive
can be compressed Limitations of Archiving
Datanodes
are the work horses of the filesystem. They store and retrieve blocks when they
are told to (by clients or the namenode), and they report back to the namenode
periodically with lists of blocks that they are storing. The namenode manages the filesystem
namespace. It maintains the filesystem tree and the metadata for all the files
and directories in the tree. A HDFS
cluster has two types of node operating in a master- worker pattern: a namenode
(the master) and a number of datanodes (workers). Namenodes and Datanodes
Without
the namenode, the filesystem cannot be used. In fact, if the machine running
the namenode were obliterated, all the files on the filesystem would be lost
since there would be no way of knowing how to reconstruct the files from the blocks
on the datanodes.
Important
to make the namenode resilient to failure, and Hadoop provides two mechanisms
for this:2. is to back up the files that make up the persistent state of the
filesystem metadata. Hadoop can be configured so that the namenode writes its
persistent state to multiple filesystems.3. Another solution is to run a
secondary namenode. The secondary namenode usually runs on a separate physical
machine, since it requires plenty of CPU and as much memory as the namenode to
perform the merge. It keeps a copy of the merged namespace image, which can be
used in the event of the namenode failing
The
Namenode maintains the file system namespace. Any change to the file system
namespace or its properties is recorded by the Namenode. An application can
specify the number of replicas of a file that should be maintained by HDFS. The
number of copies of a file is called the replication factor of that file. This
information is stored by the Namenode. HDFS
does not yet implement user quotas or access permissions. HDFS does not support
hard links or soft links. However, the HDFS architecture does not preclude
implementing these features. HDFS
supports a traditional hierarchical file organization. A user or an application
can create and remove files, move a file from one directory to another, rename
a file, create directories and store files inside these directories. File
System Namespace
When the
replication factor is three, HDFS’s placement policy is to put one replica on
one node in the local rack, another on a different node in the local rack, and
the last on a different node in a different rack. A Blockreport contains a list of all blocks
on a DataNode. The NameNode makes all
decisions regarding replication of blocks. It periodically receives a Heartbeat
and a Blockreport from each of the DataNodes in the cluster. Receipt of a
Heartbeat implies that the DataNode is functioning properly. The blocks of a file are replicated for fault
tolerance. Data Replication.