tag:blogger.com,1999:blog-91436916024512311142024-02-07T23:41:07.520-08:00Hadoop TutorialsUnknownnoreply@blogger.comBlogger15125tag:blogger.com,1999:blog-9143691602451231114.post-23209057764044811082013-06-09T02:01:00.000-07:002013-06-09T02:32:59.472-07:00Hadoop Introduction<div dir="ltr" style="text-align: left;" trbidi="on">
Hadoop is a large-scale distributed batch processing infrastructure. While it can be used on a single machine, its true power lies in its ability to scale to hundreds or thousands of computers, each with several processor cores. Hadoop is also designed to efficiently distribute large amounts of work across a set of machines.<br />
<br />
<span style="font-weight: bold;">How large an amount of work?</span> Orders of magnitude larger than many existing systems work with. Hundreds of gigabytes of data constitute the low end of Hadoop-scale. Actually Hadoop is built to process "web-scale" data on the order of hundreds of gigabytes to terabytes or petabytes. At this scale, it is likely that the input data set will not even fit on a single computer's hard drive, much less in memory. So Hadoop includes a distributed file system which breaks up input data and sends fractions of the original data to several machines in your cluster to hold. This results in the problem being processed in parallel using all of the machines in the cluster and computes output results as efficiently as possible.<br />
<br />
<u>Challenges at Large Scale</u><br />
<br />
Performing large-scale computation is difficult. To work with this volume of data requires distributing parts of the problem to multiple machines to handle in parallel. Whenever multiple machines are used in cooperation with one another, the probability of failures rises. In a single-machine environment, failure is not something that program designers explicitly worry about very often: if the machine has crashed, then there is no way for the program to recover anyway.<br />
<br />
In a distributed environment, however, partial failures are an expected and common occurrence. Networks can experience partial or total failure if switches and routers break down. Data may not arrive at a particular point in time due to unexpected network congestion. Individual compute nodes may overheat, crash, experience hard drive failures, or run out of memory or disk space. Data may be corrupted, or maliciously or improperly transmitted. Multiple implementations or versions of client software may speak slightly different protocols from one another. Clocks may become desynchronized, lock files may not be released, parties involved in distributed atomic transactions may lose their network connections part-way through, etc. In each of these cases, the rest of the distributed system should be able to recover from the component failure or transient error condition and continue to make progress. Of course, actually providing such resilience is a major software engineering challenge.<br />
<br />
Different distributed systems specifically address certain modes of failure, while worrying less about others. Hadoop provides no security model, nor safeguards against maliciously inserted data. For example, it cannot detect a man-in-the-middle attack between nodes. On the other hand, it is designed to handle hardware failure and data congestion issues very robustly. Other distributed systems make different trade-offs, as they intend to be used for problems with other requirements (e.g., high security).<br />
<br />
In addition to worrying about these sorts of bugs and challenges, there is also the fact that the compute hardware has finite resources available to it. The major resources include:<br />
<b><br /></b>
<b>* Processor time</b><br />
<b>* Memory</b><br />
<b>* Hard drive space</b><br />
<b>* Network bandwidth</b><br />
<br />
Individual machines typically only have a few gigabytes of memory. If the input data set is several terabytes, then this would require a thousand or more machines to hold it in RAM -- and even then, no single machine would be able to process or address all of the data.<br />
<br />
Hard drives are much larger; a single machine can now hold multiple terabytes of information on its hard drives. But intermediate data sets generated while performing a large-scale computation can easily fill up several times more space than what the original input data set had occupied. During this process, some of the hard drives employed by the system may become full, and the distributed system may need to route this data to other nodes which can store the overflow.<br />
<br />
Finally, bandwidth is a scarce resource even on an internal network. While a set of nodes directly connected by a gigabit Ethernet may generally experience high throughput between them, if all of the machines were transmitting multi-gigabyte data sets, they can easily saturate the switch's bandwidth capacity. Additionally if the machines are spread across multiple racks, the bandwidth available for the data transfer would be much less. Furthermore RPC requests and other data transfer requests using this channel may be delayed or dropped.<br />
<br />
To be successful, a large-scale distributed system must be able to manage the above mentioned resources efficiently. Furthermore, it must allocate some of these resources toward maintaining the system as a whole, while devoting as much time as possible to the actual core computation.<br />
<br />
Synchronization between multiple machines remains the biggest challenge in distributed system design. If nodes in a distributed system can explicitly communicate with one another, then application designers must be cognizant of risks associated with such communication patterns. It becomes very easy to generate more remote procedure calls (RPCs) than the system can satisfy! Performing multi-party data exchanges is also prone to deadlock or race conditions. Finally, the ability to continue computation in the face of failures becomes more challenging. For example, if 100 nodes are present in a system and one of them crashes, the other 99 nodes should be able to continue the computation, ideally with only a small penalty proportionate to the loss of 1% of the computing power. Of course, this will require re-computing any work lost on the unavailable node. Furthermore, if a complex communication network is overlaid on the distributed infrastructure, then determining how best to restart the lost computation and propagating this information about the change in network topology may be non trivial to implement.<br />
<br /></div>
Unknownnoreply@blogger.com40tag:blogger.com,1999:blog-9143691602451231114.post-21185095048057943432011-04-03T07:49:00.000-07:002013-06-09T02:34:39.087-07:00Interacting With HDFS<div dir="ltr" style="text-align: left;" trbidi="on">
The VMware image will expose a single-node HDFS instance for your use in MapReduce applications. If you are logged in to the virtual machine, you can interact with HDFS using the command-line tools described in Module 2. You can also manipulate HDFS through the MapReduce plugin.<br />
Using the Command Line<br />
<br />
An interesting MapReduce task will require some external data to process: log files, web crawl results, etc. Before you can begin processing with MapReduce, data must be loaded into its distributed file system. In Module 2, you learned how to copy files from the local file system into HDFS. But this will copy files from the local file system of the VM into HDFS - not from the file system of your host computer.<br />
<br />
To load data into HDFS in the virtual machine, you have several options available to you:<br />
<br />
1. scp the files to the virtual machine, and then use the bin/hadoop fs -put ... syntax to copy the files from the VM's local file system into HDFS,<br />
2. pipe the data from the local machine into a put command reading from stdin,<br />
3. or install the Hadoop tools on the host system and configure it to communicate directly with the guest instance<br />
<br />
We will review each of these in turn.<br />
<br />
To load data into HDFS using the command line within the virtual machine, you can first send the data to the VM's local disk, then insert it into HDFS. You can send files to the VM using an scp client, such as the pscp component of putty, or WinSCP.<br />
<br />
scp will allow you to copy files from one machine to another over the network. The scp command takes two arguments, both of the form [[username@]hostname]:filename. The scp command itself is of the form scp source dest, where source and dest are formatted as described above. By default, it will assume that paths are on the local host, and should be accessed using the current username. You can override the username and hostname to perform remote copies.<br />
<br />
So supposing you have a file named foo.txt, and you would like to copy this into the virtual machine which has IP address 192.168.190.128, you can perform this operation with the command:<br />
<br />
$ scp foo.txt hadoop-user@192.168.190.128:foo.txt<br />
<br />
If you are using the pscp program, substitute pscp instead of scp above. A copy of the "regular" scp can be run under cygwin by downloading the OpenSSH package. pscp is a utility by the makers of putty and does not require cygwin.<br />
<br />
Note that since we did not specify a destination directory, it will go in /home/hadoop-user by default. To change the target directory, specify it after the hostname (e.g., hadoop-user@192.168.128.190:/some/dest/path/foo.txt.) You can also omit the destination filename, if you want it to be identical to the source filename. However, if you omit both the target directory and filename, you must not forget the colon (":") that follows the target hostname. Otherwise it will make a local copy of the file, with the name 192.168.190.128. An equivalent correct command to copy foo.txt to /home/hadoop-user on the remote machine is:<br />
<br />
$ scp foo.txt hadoop-user@192.168.190.128:<br />
<br />
Windows users may be more inclined to use a GUI tool to perform scp commands. The free WinSCP program provides an FTP-like GUI interface over scp.<br />
<br />
After you have copied files into the local disk of the virtual machine, you can log in to the virtual machine as hadoop-user and insert the files into HDFS using the standard Hadoop commands. For example,<br />
<br />
hadoop-user@vm-instance:hadoop$ bin/hadoop dfs -put ~/foo.txt \<br />
/user/hadoop-user/input/foo.txt<br />
<br />
A second option available to upload individual files to HDFS from the host machine is to echo the file contents into a put command running via ssh. e.g., assuming you have the cat program (which comes with Linux or cygwin) to echo the contents of a file to the terminal output, you can connect its output to the input of a put command running over ssh like so:<br />
<br />
you@host-machine$ cat somefile | ssh hadoop-user@vm-ip-addr \<br />
"hadoop/bin/hadoop fs -put - destinationfile<br />
<br />
The - as an argument to the put command instructs the system to use stdin as its input file. This will copy somefile on the host machine to destinationfile in HDFS on the virtual machine.<br />
<br />
Finally, if you are running either Linux or cygwin, you can copy the /hadoop-0.18.0 directory on the CD to your local instance. You can then configure hadoop-site.xml to use the virtual machine as the default distributed file system (by setting the fs.default.name parameter). If you then run bin/hadoop fs -put ... commands on this machine (or any other hadoop commands, for that matter), they will interact with HDFS as served by the virtual machine. See the Hadoop quickstart for instructions on configuring a Hadoop installation, or Module 7 for a more thorough treatment.<br />
Using the MapReduce Plugin For Eclipse<br />
<br />
An easier way to manipulate files in HDFS may be through the Eclipse plugin. In the DFS location viewer, right-click on any folder to see a list of actions available. You can create new subdirectories, upload individual files or whole subdirectories, or download files and directories to the local disk.<br />
<br />
If /user/hadoop-user does not exist, create that first. Right-click on the top-level directory and select "Create New Directory". Type "user" and click OK. You will then need to refresh the current directory view by right-clicking and selecting "Refresh" from the pop-up menu. Repeat this process to create the "hadoop-user" directory under "user."<br />
<br />
Now, prepare some local files to upload. Somewhere on your hard drive, create a directory named "input" and find some text files to copy there. In the DFS explorer, right-click the "hadoop-user" directory and click "Upload Directory to DFS." Select your new input folder and click OK. Eclipse will copy the files directly into HDFS, bypassing the local drive of the virtual machine. You may have to refresh the directory view to see your changes. You should now have a directory hierarchy containing the /user/hadoop-user/input directory, which has at least one text file in it.<br />
Running a Sample Program<br />
<br />
While we have not yet formally introduced the programming style for Hadoop, we can still test whether a MapReduce program will run on our Hadoop virtual machine. This section walks you through the steps required to verify this.<br />
<br />
The program that we will run is a word count utility. The program will read the files you uploaded to HDFS in the previous section, and determine how many times each word in the files appears.<br />
<br />
If you have not already done so, start the virtual machine and Eclipse, and switch Eclipse to use the MapReduce perspective. Instructions are in the previous section.<br />
Creating the Project<br />
<br />
In the menu, click File * New * Project. Select "Map/Reduce Project" from the list and click Next.<br />
<br />
You now need to select a project name. Any name will do, e.g., "WordCount". You will also need to specify the Hadoop Library Installation Path. This is the path where you made a copy of the /hadoop-0.18.0 folder on the CD. Since we have not yet configured this part of Eclipse, do so now by clicking "Configure Hadoop install directory..." and choosing the path where you copied Hadoop to. There should be a file named hadoop-0.18.0-core.jar in this directory. Creating a MapReduce Project instead of a generic Java project automatically adds the prerequisite jar files to the build path. If you create a regular Java project, you must add the Hadoop jar (and its dependencies) to the build path manually.<br />
<br />
When you have completed these steps, click Finish.<br />
Creating the Source Files<br />
<br />
Our program needs three classes to run: a Mapper, a Reducer, and a Driver. The Driver tells Hadoop how to run the MapReduce process. The Mapper and Reducer operate on your data.<br />
<br />
Right-click on the "src" folder under your project and select New * Other.... In the "Map/Reduce" folder on the resulting window, we can create Mapper, Reducer, and Driver classes based on pre-written stub code. Create classes named WordCountMapper, WordCountReducer, and WordCount that use the Mapper, Reducer, and Driver stubs respectively.<br />
<br />
The code for each of these classes is shown here. You can copy this code into your files.<br />
<br />
WordCountMapper.java:<br />
<br />
import java.io.IOException;<br />
import java.util.StringTokenizer;<br />
<br />
import org.apache.hadoop.io.IntWritable;<br />
import org.apache.hadoop.io.LongWritable;<br />
import org.apache.hadoop.io.Text;<br />
import org.apache.hadoop.io.Writable;<br />
import org.apache.hadoop.io.WritableComparable;<br />
import org.apache.hadoop.mapred.MapReduceBase;<br />
import org.apache.hadoop.mapred.Mapper;<br />
import org.apache.hadoop.mapred.OutputCollector;<br />
import org.apache.hadoop.mapred.Reporter;<br />
<br />
public class WordCountMapper extends MapReduceBase<br />
implements Mapper<longwritable intwritable="" text=""> {<br /><br /> private final IntWritable one = new IntWritable(1);<br /> private Text word = new Text();<br /><br /> public void map(WritableComparable key, Writable value,<br /> OutputCollector output, Reporter reporter) throws IOException {<br /><br /> String line = value.toString();<br /> StringTokenizer itr = new StringTokenizer(line.toLowerCase());<br /> while(itr.hasMoreTokens()) {<br /> word.set(itr.nextToken());<br /> output.collect(word, one);<br /> }<br /> }<br />}<br /><br />WordCountReducer.java:<br /><br />import java.io.IOException;<br />import java.util.Iterator;<br /><br />import org.apache.hadoop.io.IntWritable;<br />import org.apache.hadoop.io.Text;<br />import org.apache.hadoop.io.WritableComparable;<br />import org.apache.hadoop.mapred.MapReduceBase;<br />import org.apache.hadoop.mapred.OutputCollector;<br />import org.apache.hadoop.mapred.Reducer;<br />import org.apache.hadoop.mapred.Reporter;<br /><br />public class WordCountReducer extends MapReduceBase<br /> implements Reducer<text intwritable="" text=""> {<br /><br /> public void reduce(Text key, Iterator values,<br /> OutputCollector output, Reporter reporter) throws IOException {<br /><br /> int sum = 0;<br /> while (values.hasNext()) {<br /> IntWritable value = (IntWritable) values.next();<br /> sum += value.get(); // process value<br /> }<br /><br /> output.collect(key, new IntWritable(sum));<br /> }<br />}<br /><br />WordCount.java:<br /><br />import org.apache.hadoop.fs.Path;<br />import org.apache.hadoop.io.IntWritable;<br />import org.apache.hadoop.io.Text;<br />import org.apache.hadoop.mapred.FileInputFormat;<br />import org.apache.hadoop.mapred.FileOutputFormat;<br />import org.apache.hadoop.mapred.JobClient;<br />import org.apache.hadoop.mapred.JobConf;<br /><br />public class WordCount {<br /><br /> public static void main(String[] args) {<br /> JobClient client = new JobClient();<br /> JobConf conf = new JobConf(WordCount.class);<br /><br /> // specify output types<br /> conf.setOutputKeyClass(Text.class);<br /> conf.setOutputValueClass(IntWritable.class);<br /><br /> // specify input and output dirs<br /> FileInputPath.addInputPath(conf, new Path("input"));<br /> FileOutputPath.addOutputPath(conf, new Path("output"));<br /><br /> // specify a mapper<br /> conf.setMapperClass(WordCountMapper.class);<br /><br /> // specify a reducer<br /> conf.setReducerClass(WordCountReducer.class);<br /> conf.setCombinerClass(WordCountReducer.class);<br /><br /> client.setConf(conf);<br /> try {<br /> JobClient.runJob(conf);<br /> } catch (Exception e) {<br /> e.printStackTrace();<br /> }<br /> }<br />}<br /><br />For now, don't worry about how these functions work; we will introduce how to write MapReduce programs in Module 4. We currently just want to establish that we can run jobs on the virtual machine.<br />Launching the Job<br /><br />After the code has been entered, it is time to run it. You have already created a directory named input below /user/hadoop-user in HDFS. This will serve as the input files to this process. In the Project Explorer, right-click on the driver class, WordCount.java. In the pop-up menu, select Run As * Run On Hadoop. A window will appear asking you to select a Hadoop location to run on. Select the VMware server that you configured earlier, and click Finish.<br /><br />If all goes well, the progress output from Hadoop should appear in the console in Eclipse; it should look something like:<br /><br />08/06/25 12:14:22 INFO mapred.FileInputFormat: Total input paths to process : 3<br />08/06/25 12:14:23 INFO mapred.JobClient: Running job: job_200806250515_0002<br />08/06/25 12:14:24 INFO mapred.JobClient: map 0% reduce 0%<br />08/06/25 12:14:31 INFO mapred.JobClient: map 50% reduce 0%<br />08/06/25 12:14:33 INFO mapred.JobClient: map 100% reduce 0%<br />08/06/25 12:14:42 INFO mapred.JobClient: map 100% reduce 100%<br />08/06/25 12:14:43 INFO mapred.JobClient: Job complete: job_200806250515_0002<br />08/06/25 12:14:43 INFO mapred.JobClient: Counters: 12<br />08/06/25 12:14:43 INFO mapred.JobClient: Job Counters<br />08/06/25 12:14:43 INFO mapred.JobClient: Launched map tasks=4<br />08/06/25 12:14:43 INFO mapred.JobClient: Launched reduce tasks=1<br />08/06/25 12:14:43 INFO mapred.JobClient: Data-local map tasks=4<br />08/06/25 12:14:43 INFO mapred.JobClient: Map-Reduce Framework<br />08/06/25 12:14:43 INFO mapred.JobClient: Map input records=211<br />08/06/25 12:14:43 INFO mapred.JobClient: Map output records=1609<br />08/06/25 12:14:43 INFO mapred.JobClient: Map input bytes=11627<br />08/06/25 12:14:43 INFO mapred.JobClient: Map output bytes=16918<br />08/06/25 12:14:43 INFO mapred.JobClient: Combine input records=1609<br />08/06/25 12:14:43 INFO mapred.JobClient: Combine output records=682<br />08/06/25 12:14:43 INFO mapred.JobClient: Reduce input groups=568<br />08/06/25 12:14:43 INFO mapred.JobClient: Reduce input records=682<br />08/06/25 12:14:43 INFO mapred.JobClient: Reduce output records=568<br /><br />In the DFS Explorer, right-click on /user/hadoop-user and select "Refresh." You should now see an "output" directory containing a file named part-00000. This is the output of the job. Double-clicking this file will allow you to view it in Eclipse; you can see each word and its frequency in the documents. (You may receive a warning that this file is larger than 1 MB, first. Click OK.)<br /><br />If you want to run the job again, you will need to delete the output directory first. Right-click the output directory in the DFS Explorer and click "Delete."<br /><br />Congratulations! You should now have a functioning Hadoop development environment.</text></longwritable></div>
Unknownnoreply@blogger.com2tag:blogger.com,1999:blog-9143691602451231114.post-52216311409522997142011-04-03T07:46:00.000-07:002013-06-09T02:34:58.053-07:00Getting Started With Eclipse<div dir="ltr" style="text-align: left;" trbidi="on">
A powerful development environment for Java-based programming is Eclipse. Eclipse is a free, open-source IDE. It supports multiple languages through a plugin interface, with special attention paid to Java. Tools designed for working with Hadoop can be integrated into Eclipse, making it an attractive platform for Hadoop development. In this section we will review how to obtain, configure, and use Eclipse.<br />
Downloading and Installing<br />
<br />
Note: The most current release of Eclipse is called Ganymede. Our testing shows that Ganymede is currently incompatible with the Hadoop MapReduce plugin. The most recent version which worked properly with the Hadoop plugin is version 3.3.1, "Europa." To download Europa, do not visit the main Eclipse website; it can be found in the archive site http://archive.eclipse.org/eclipse/downloads/ as the "Archived Release (3.3.1)."<br />
<br />
The Eclipse website has several versions available for download; choose either "Eclipse Classic" or "Eclipse IDE for Java Developers."<br />
<br />
Because it is written in Java, Eclipse is very cross-platform. Eclipse is available for Windows, Linux, and Mac OSX.<br />
<br />
Installing Eclipse is very straightforward. Eclipse is packaged as a .zip file. Windows itself can natively unzip the compressed file into a directory. If you encounter errors using the Windows decompression tool (see [1]), try using a third-party unzip utility such as 7-zip or WinRAR.<br />
<br />
After you have decompressed Eclipse into a directory, you can run it straight from that directory with no modifications or other "installation" procedure. You may want to move it into C:\Program Files\Eclipse to keep consistent with your other applications, but it can reside in the Desktop or elsewhere as well.<br />
Installing the Hadoop MapReduce Plugin<br />
<br />
Hadoop comes with a plugin for Eclipse that makes developing MapReduce programs easier. In the hadoop-0.18.0/contrib/eclipse-plugin directory on this CD, you will find a file named hadoop-0.18.0-eclipse-plugin.jar. Copy this into the plugins/ subdirectory of wherever you unzipped Eclipse.<br />
Making a Copy of Hadoop<br />
<br />
While we will be running MapReduce programs on the virtual machine, we will be compiling them on the host machine. The host therefore needs a copy of the Hadoop jars to compile your code against. Copy the /hadoop-0.18.0 directory from the CD into a location on your local drive, and remember where this is. You do not need to configure this copy of Hadoop in any way.<br />
<br />
Running Eclipse<br />
<br />
Navigate into the Eclipse directory and run eclipse.exe to start the IDE. Eclipse stores all of your source projects and their related settings in a directory called a workspace.<br />
<br />
Upon starting Eclipse, it will prompt you for a directory to act as the workspace. Choose a directory name that makes sense to you and click OK.<br />
eclipse-workspace<br />
<br />
<br />
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgDUczyFz90MBx9rAhLrZjBSM7F_cDB7FAPQGgD5ZxSit9f-C_k9MOjO0mUa4NkcOVabQ5NWHYgOgYmFpiv-5ijVJugsakr2iOnLydd-MEa9JMPSC9bfQsQqyWkxtozmJnQwpzongOi5Vo/s1600/Hadoop+Eclipse.png" onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}"><img alt="" border="0" id="BLOGGER_PHOTO_ID_5591369033472572802" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgDUczyFz90MBx9rAhLrZjBSM7F_cDB7FAPQGgD5ZxSit9f-C_k9MOjO0mUa4NkcOVabQ5NWHYgOgYmFpiv-5ijVJugsakr2iOnLydd-MEa9JMPSC9bfQsQqyWkxtozmJnQwpzongOi5Vo/s400/Hadoop+Eclipse.png" style="cursor: hand; cursor: pointer; height: 167px; width: 400px;" /></a><br />
<br />
Configuring the MapReduce Plugin<br />
<br />
In this section, we will walk through the process of configuring Eclipse to switch to the MapReduce perspective and connect to the Hadoop virtual machine.<br />
<br />
Step 1: If you have not already done so, start Eclipse and choose a workspace directory. If you are presented with a "welcome" screen, click the button that says "Go to the Workbench." The Workbench is the main view of Eclipse, where you can write source code, launch programs, and manage your projects.<br />
<br />
Step 2: Start the virtual machine. Double-click on the image.vmx file in the virtual machine's installation directory to launch the virtual machine. It should begin the Linux boot process.<br />
<br />
Step 3: Switch to the MapReduce perspective. In the upper-right corner of the workbench, click the "Open Perspective" button, as shown in Figure 3.4:<br />
<br />
Select "Other," followed by "Map/Reduce" in the window that opens up. At first, nothing may appear to change. In the menu, choose Window * Show View * Other. Under "MapReduce Tools," select "Map/Reduce Locations." This should make a new panel visible at the bottom of the screen, next to Problems and Tasks.<br />
<br />
Step 4: Add the Server. In the Map/Reduce Locations panel, click on the elephant logo in the upper-right corner to add a new server to Eclipse.<br />
<br />
You will now be asked to fill in a number of parameters identifying the server. To connect to the VMware image, the values are:<br />
<br />
Location name: (Any descriptive name you want; e.g., "VMware server")<br />
Map/Reduce Master Host: (The IP address printed at startup)<br />
Map/Reduce Master Port: 9001<br />
DFS Master Port: 9000<br />
User name: hadoop-user<br />
<br />
Next, click on the "Advanced" tab. There are two settings here which must be changed.<br />
<br />
Scroll down to hadoop.job.ugi. It contains your current Windows login credentials. Highlight the first comma-separated value in this list (your username) and replace it with hadoop-user.<br />
<br />
Next, scroll further down to mapred.system.dir. Erase the current value and set it to /hadoop/mapred/system.<br />
<br />
When you are done, click "Finish." Your server will now appear in the Map/Reduce Locations panel. If you look in the Project Explorer (upper-left corner of Eclipse), you will see that the MapReduce plugin has added the ability to browse HDFS. Click the [+] buttons to expand the directory tree to see any files already there. If you inserted files into HDFS yourself, they will be visible in this tree.</div>
Unknownnoreply@blogger.com18tag:blogger.com,1999:blog-9143691602451231114.post-90359417495204899622011-04-03T07:42:00.000-07:002011-04-03T07:46:24.151-07:00A Virtual Machine Hadoop EnvironmentThis section explains how to configure a virtual machine to run Hadoop within your host computer. After installing the virtual machine software and the virtual machine image, you will learn how to log in and run jobs within the Hadoop environment.<br /><br />Users of Linux, Mac OSX, or other Unix-like environments are able to install Hadoop and run it on one (or more) machines with no additional software beyond Java. If you are interested in doing this, there are instructions available on the Hadoop web site in the quickstart document.<br /><br />Running Hadoop on top of Windows requires installing cygwin, a Linux-like environment that runs within Windows. Hadoop works reasonably well on cygwin, but it is officially for "development purposes only." Hadoop on cygwin may be unstable, and installing cygwin itself can be cumbersome.<br /><br />To aid developers in getting started easily with Hadoop, we have provided a virtual machine image containing a preconfigured Hadoop installation. The virtual machine image will run inside of a "sandbox" environment in which we can run another operating system. The OS inside the sandbox does not know that there is another operating environment outside of it; it acts as though it is on its own computer. This sandbox environment is referred to as the "guest machine" running a "guest operating system." The actual physical machine running the VM software is referred to as the "host machine" and it runs the "host operating system." The virtual machine provides other host-machine applications with the appearance that another physical computer is available on the same network. Applications running on the host machine see the VM as a separate machine with its own IP address, and can interact with the programs inside the VM in this fashion.<br /><br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEguObbY7pxT2EIrDaUGWNjnhYroiKO6Ll88N3EujhZ74A8ZFVzNHJ7QmThxyaP64z6S2iJsVNhG4Vz6SQIMt8CSEBetmXysPNb4a-rGeidx4zwrAi6rC2_oZmgeDtXno6vVkuxBS-jE3Lc/s1600/Hadoop.png"><img style="cursor:pointer; cursor:hand;width: 400px; height: 279px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEguObbY7pxT2EIrDaUGWNjnhYroiKO6Ll88N3EujhZ74A8ZFVzNHJ7QmThxyaP64z6S2iJsVNhG4Vz6SQIMt8CSEBetmXysPNb4a-rGeidx4zwrAi6rC2_oZmgeDtXno6vVkuxBS-jE3Lc/s400/Hadoop.png" border="0" alt=""id="BLOGGER_PHOTO_ID_5591367969667484306" /></a><br /><br />Application developers do not need to use the virtual machine to run Hadoop. Developers on Linux typically use Hadoop in their native development environment, and Windows users often install cygwin for Hadoop development. The virtual machine provided with this tutorial allows users a convenient alternative development platform with a minimum of configuration required. Another advantage of the virtual machine is its easy reset functionality. If your experiments break the Hadoop configuration or render the operating system unusable, you can always simply copy the virtual machine image from the CD back to where you installed it on your computer, and start from a known-good state.<br /><br />Our virtual machine will run Linux, and comes preconfigured to run Hadoop in pseudo-distributed mode on this system. (It is configured like a fully distributed system, but is actually running on a single machine instance.) We can write Hadoop programs using editors and other applications of the host platform, and run them on our "cluster" consisting of just the virtual machine. We will connect our host environment to the virtual machine through the network.<br /><br />It should be noted that the virtual machine will also run inside of another instance of Linux. Linux users can install the virtual machine software and run the Hadoop VM as well; the same separation between host processes and guest processes applies here. <br /><br /><br /><span style="font-weight:bold;">Installing VMware Player</span><br /><br />The virtual machine is designed to run inside of the VMware Player. A copy of the VMware player installer (version 2.5) for both 32-bit Windows and Linux is included here (linux-rpm, linux-bundle, windows-exe). A Getting Started guide for VMware player provides instructions for installing the VMware player. Review the license information for VMware player before using it..<br /><br />If you are running on a different operating system, or would prefer to download a more recent version of the player, an alternate installation strategy is to navigate to http://info.vmware.com/content/GLP_VMwarePlayer. You will need to register for a "virtualization starter kit." You will receive an email with a link to "Download VMware Player." Click the link, then click the "download now" button at the top of the screen under "most recent version" and follow the instructions. VMware Player is available for Windows or Linux. The latter is available in both 32- and 64-bit versions.<br /><br />VMware Player itself is approximately a 170 MB download. When the download has completed, run the installer program to set up VMware Player, and follow the prompts as directed. Installation in Windows is performed by a typical Windows installation process.<br /><br /><span style="font-weight:bold;">Setting up the Virtual Environment</span><br /><br />Next, copy the Hadoop Virtual Machine into a location on your hard drive. It is a zipped vmware folder (hadoop-vm-appliance-0-18-0) which includes a few files; a .vmdk file that is a snapshot of the virtual machine's hard drive, and a .vmx file which contains the configuration information to start the virtual machine. After unzipping the vmware folder zip file, to start the virtual machine, double-click on the hadoop-appliance-0.18.0.vmx file in Windows Explorer.<br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjltm3pUEu6t5pmWFelu2QBGCLMiNJ5ZM3ZCckW6rQ_PoH1fwZJcGHZfSyjFBMba5NGlm0x8kAQeHyGYIkMbhKLCPIdSlHtNcd8JaezNfL23eSuI10JaeyjgaX-r4AxK6yU5Ck6NDd_vIw/s1600/Hadoop+Vmplayer.png"><img style="cursor:pointer; cursor:hand;width: 400px; height: 317px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjltm3pUEu6t5pmWFelu2QBGCLMiNJ5ZM3ZCckW6rQ_PoH1fwZJcGHZfSyjFBMba5NGlm0x8kAQeHyGYIkMbhKLCPIdSlHtNcd8JaezNfL23eSuI10JaeyjgaX-r4AxK6yU5Ck6NDd_vIw/s400/Hadoop+Vmplayer.png" border="0" alt=""id="BLOGGER_PHOTO_ID_5591368457404734962" /></a><br /><br />When you start the virtual machine for the first time, VMware Player will recognize that the virtual machine image is not in the same location it used to be. You should inform VMware Player that you copied this virtual machine image. VMware Player will then generate new session identifiers for this instance of the virtual machine. If you later move the VM image to a different location on your own hard drive, you should tell VMware Player that you have moved the image.<br /><br />If you ever corrupt the VM image (e.g., by inadvertently deleting or overwriting important files), you can always restore a pristine copy of the virtual machine by copying a fresh VM image off of this tutorial CD. (So don't be shy about exploring! You can always reset it to a functioning state.)<br /><br />After you select this option and click OK, the virtual machine should begin booting normally. You will see it perform the standard boot procedure for a Linux system. It will bind itself to an IP address on an unused network segment, and then display a prompt allowing a user to log in.<br /><br />Virtual Machine User Accounts<br /><br />The virtual machine comes preconfigured with two user accounts: "root" and "hadoop-user". The hadoop-user account has sudo permissions to perform system management functions, such as shutting down the virtual machine. The vast majority of your interaction with the virtual machine will be as hadoop-user.<br /><br />To log in as hadoop-user, first click inside the virtual machine's display. The virtual machine will take control of your keyboard and mouse. To escape back into Windows at any time, press CTRL+ALT at the same time. The hadoop-user user's password is hadoop. To log in as root, the password is root.<br />Running a Hadoop Job<br /><br />Now that the VM is started, or you have installed Hadoop on your own system in pseudo-distributed mode, let us make sure that Hadoop is properly configured.<br /><br />If you are using the VM, log in as hadoop-user, as directed above. You will start in your home directory: /home/hadoop-user. Typing ls, you will see a directory named hadoop/, as well as a set of scripts to manage the server. The virtual machine's hostname is hadoop-desk.<br /><br />First, we must start the Hadoop system. Type the following command:<br /><br />hadoop-user@hadoop-desk:~$ ./start-hadoop<br /><br />If you installed Hadoop on your host system, use the following commands to launch hadoop (assuming you installed to ~/hadoop):<br /><br />you@your-machine:~$ cd hadoop<br />you@your-machine:~/hadoop$ bin/start-all.sh<br /><br />You will see a set of status messages appear as the services boot. If prompted whether it is okay to connect to the current host, type "yes". Try running an example program to ensure that Hadoop is correctly configured:<br /><br />hadoop-user@hadoop-desk:~$ cd hadoop<br />hadoop-user@hadoop-desk:~/hadoop$ bin/hadoop jar hadoop-0.18.0-examples.jar pi 10 1000000<br /><br />This should provide output that looks something like this:<br /><br />Wrote input for Map #1<br />Wrote input for Map #2<br />Wrote input for Map #3<br />...<br />Wrote input for Map #10<br />Starting Job<br />INFO mapred.FileInputFormat: Total input paths to process: 10<br />INFO mapred.JobClient: Running job: job_200806230804_0001<br />INFO mapred.JobClient: map 0% reduce 0%<br />INFO mapred.JobClient: map 10% reduce 0%<br />...<br />INFO mapred.JobClient: map 100% reduce 100%<br />INFO mapred.JobClient: Job complete: job_200806230804_0001<br />...<br />Job Finished in 25.841 second<br />Estimated value of PI is 3.141688<br /><br />This task runs a simulation to estimate the value of pi based on sampling. The test first wrote out a number of points to a list of files, one per map task. It then calculated an estimate of pi based on these points, in the MapReduce task itself. How MapReduce works and how to write such a program are discussed in the next module. The Hadoop client program you used to launch the pi test launched the job, displayed some progress update information as to how the job is proceeding, and then displayed some final performance counters and the job-specific output: an estimate for the value of pi.<br />Accessing the VM via ssh<br /><br />Rather than directly use the terminal of the virtual machine, you can also log in "remotely" over ssh from the host environment. Using an ssh client like putty (in Windows), log in with username "hadoop-user" (password hadoop) to the IP address displayed in the virtual machine terminal when it starts up. You can now interact with this virtual machine as if it were another Linux machine on the network.<br /><br />This can only be done from the host machine. The VMware image is, by default, configured to use host-only networking; only the host machine can talk to the virtual machine over its network interface. The virtual machine does not appear on the actual external network. This is done for security purposes.<br /><br />If you need to find the virtual machine's IP address later, the ifconfig command will display this under the "inet addr" field.<br /><br />Important security note: In the VMware settings, you can reconfigure the virtual machine for networked access rather than host-only networking. If you enable network access, you can access the virtual machine from anywhere else on the network via its IP address. In this case, you should change the passwords associated with the accounts on the virtual machine to prevent unauthorized users from logging in with the default password.<br />Shutting Down the VM<br /><br />When you are done with the virtual machine, you can turn it off by logging in as hadoop-user and typing sudo poweroff. The virtual machine will shut itself down in an orderly fashion and the window it runs in will disappear.Unknownnoreply@blogger.com2tag:blogger.com,1999:blog-9143691602451231114.post-25268140618824925672011-04-03T07:41:00.001-07:002011-04-03T07:41:25.099-07:00HDFS Web InterfaceHDFS exposes a web server which is capable of performing basic status monitoring and file browsing operations. By default this is exposed on port 50070 on the NameNode. Accessing http://namenode:50070/ with a web browser will return a page containing overview information about the health, capacity, and usage of the cluster (similar to the information returned by bin/hadoop dfsadmin -report).<br /><br />The address and port where the web interface listens can be changed by setting dfs.http.address in conf/hadoop-site.xml. It must be of the form address:port. To accept requests on all addresses, use 0.0.0.0.<br /><br />From this interface, you can browse HDFS itself with a basic file-browser interface. Each DataNode exposes its file browser interface on port 50075. You can override this by setting the dfs.datanode.http.address configuration key to a setting other than 0.0.0.0:50075. Log files generated by the Hadoop daemons can be accessed through this interface, which is useful for distributed debugging and troubleshooting.Unknownnoreply@blogger.com0tag:blogger.com,1999:blog-9143691602451231114.post-21915165988932183602011-04-03T07:39:00.002-07:002011-04-03T07:40:42.449-07:00Rack AwarenessFor small clusters in which all servers are connected by a single switch, there are only two levels of locality: "on-machine" and "off-machine." When loading data from a DataNode's local drive into HDFS, the NameNode will schedule one copy to go into the local DataNode, and will pick two other machines at random from the cluster.<br /><br />For larger Hadoop installations which span multiple racks, it is important to ensure that replicas of data exist on multiple racks. This way, the loss of a switch does not render portions of the data unavailable due to all replicas being underneath it.<br /><br />HDFS can be made rack-aware by the use of a script which allows the master node to map the network topology of the cluster. While alternate configuration strategies can be used, the default implementation allows you to provide an executable script which returns the "rack address" of each of a list of IP addresses.<br /><br />The network topology script receives as arguments one or more IP addresses of nodes in the cluster. It returns on stdout a list of rack names, one for each input. The input and output order must be consistent.<br /><br />To set the rack mapping script, specify the key topology.script.file.name in conf/hadoop-site.xml. This provides a command to run to return a rack id; it must be an executable script or program. By default, Hadoop will attempt to send a set of IP addresses to the file as several separate command line arguments. You can control the maximum acceptable number of arguments with the topology.script.number.args key.<br /><br />Rack ids in Hadoop are hierarchical and look like path names. By default, every node has a rack id of /default-rack. You can set rack ids for nodes to any arbitrary path, e.g., /foo/bar-rack. Path elements further to the left are higher up the tree. Thus a reasonable structure for a large installation may be /top-switch-name/rack-name.<br /><br />Hadoop rack ids are not currently expressive enough to handle an unusual routing topology such as a 3-d torus; they assume that each node is connected to a single switch which in turn has a single upstream switch. This is not usually a problem, however. Actual packet routing will be directed using the topology discovered by or set in switches and routers. The Hadoop rack ids will be used to find "near" and "far" nodes for replica placement (and in 0.17, MapReduce task placement).<br /><br />The following example script performs rack identification based on IP addresses given a hierarchical IP addressing scheme enforced by the network administrator. This may work directly for simple installations; more complex network configurations may require a file- or table-based lookup process. Care should be taken in that case to keep the table up-to-date as nodes are physically relocated, etc. This script requires that the maximum number of arguments be set to 1.<br /><br />#!/bin/bash<br /># Set rack id based on IP address.<br /># Assumes network administrator has complete control<br /># over IP addresses assigned to nodes and they are<br /># in the 10.x.y.z address space. Assumes that<br /># IP addresses are distributed hierarchically. e.g.,<br /># 10.1.y.z is one data center segment and 10.2.y.z is another;<br /># 10.1.1.z is one rack, 10.1.2.z is another rack in<br /># the same segment, etc.)<br />#<br /># This is invoked with an IP address as its only argument<br /><br /># get IP address from the input<br />ipaddr=$0<br /><br /># select "x.y" and convert it to "x/y"<br />segments=`echo $ipaddr | cut --delimiter=. --fields=2-3 --output-delimiter=/`<br />echo /${segments}Unknownnoreply@blogger.com3tag:blogger.com,1999:blog-9143691602451231114.post-63261469797545532252011-04-03T07:39:00.001-07:002011-04-03T07:39:53.698-07:00Additional HDFS TasksRebalancing Blocks<br /><br />New nodes can be added to a cluster in a straightforward manner. On the new node, the same Hadoop version and configuration (conf/hadoop-site.xml) as on the rest of the cluster should be installed. Starting the DataNode daemon on the machine will cause it to contact the NameNode and join the cluster. (The new node should be added to the slaves file on the master server as well, to inform the master how to invoke script-based commands on the new node.)<br /><br />But the new DataNode will have no data on board initially; it is therefore not alleviating space concerns on the existing nodes. New files will be stored on the new DataNode in addition to the existing ones, but for optimum usage, storage should be evenly balanced across all nodes.<br /><br />This can be achieved with the automatic balancer tool included with Hadoop. The Balancer class will intelligently balance blocks across the nodes to achieve an even distribution of blocks within a given threshold, expressed as a percentage. (The default is 10%.) Smaller percentages make nodes more evenly balanced, but may require more time to achieve this state. Perfect balancing (0%) is unlikely to actually be achieved.<br /><br />The balancer script can be run by starting bin/start-balancer.sh in the Hadoop directory. The script can be provided a balancing threshold percentage with the -threshold parameter; e.g., bin/start-balancer.sh -threshold 5. The balancer will automatically terminate when it achieves its goal, or when an error occurs, or it cannot find more candidate blocks to move to achieve better balance. The balancer can always be terminated safely by the administrator by running bin/stop-balancer.sh.<br /><br />The balancing script can be run either when nobody else is using the cluster (e.g., overnight), but can also be run in an "online" fashion while many other jobs are on-going. To prevent the rebalancing process from consuming large amounts of bandwidth and significantly degrading the performance of other processes on the cluster, the dfs.balance.bandwidthPerSec configuration parameter can be used to limit the number of bytes/sec each node may devote to rebalancing its data store.<br />Copying Large Sets of Files<br /><br />When migrating a large number of files from one location to another (either from one HDFS cluster to another, from S3 into HDFS or vice versa, etc), the task should be divided between multiple nodes to allow them all to share in the bandwidth required for the process. Hadoop includes a tool called distcp for this purpose.<br /><br />By invoking bin/hadoop distcp src dest, Hadoop will start a MapReduce task to distribute the burden of copying a large number of files from src to dest. These two parameters may specify a full URL for the the path to copy. e.g., "hdfs://SomeNameNode:9000/foo/bar/" and "hdfs://OtherNameNode:2000/baz/quux/" will copy the children of /foo/bar on one cluster to the directory tree rooted at /baz/quux on the other. The paths are assumed to be directories, and are copied recursively. S3 URLs can be specified with s3://bucket-name/key.<br />Decommissioning Nodes<br /><br />In addition to allowing nodes to be added to the cluster on the fly, nodes can also be removed from a cluster while it is running, without data loss. But if nodes are simply shut down "hard," data loss may occur as they may hold the sole copy of one or more file blocks.<br /><br />Nodes must be retired on a schedule that allows HDFS to ensure that no blocks are entirely replicated within the to-be-retired set of DataNodes.<br /><br />HDFS provides a decommissioning feature which ensures that this process is performed safely. To use it, follow the steps below:<br /><br />Step 1: Cluster configuration. If it is assumed that nodes may be retired in your cluster, then before it is started, an excludes file must be configured. Add a key named dfs.hosts.exclude to your conf/hadoop-site.xml file. The value associated with this key provides the full path to a file on the NameNode's local file system which contains a list of machines which are not permitted to connect to HDFS.<br /><br />Step 2: Determine hosts to decommission. Each machine to be decommissioned should be added to the file identified by dfs.hosts.exclude, one per line. This will prevent them from connecting to the NameNode.<br /><br />Step 3: Force configuration reload. Run the command bin/hadoop dfsadmin -refreshNodes. This will force the NameNode to reread its configuration, including the newly-updated excludes file. It will decommission the nodes over a period of time, allowing time for each node's blocks to be replicated onto machines which are scheduled to remain active.<br /><br />Step 4: Shutdown nodes. After the decommission process has completed, the decommissioned hardware can be safely shutdown for maintenance, etc. The bin/hadoop dfsadmin -report command will describe which nodes are connected to the cluster.<br /><br />Step 5: Edit excludes file again. Once the machines have been decommissioned, they can be removed from the excludes file. Running bin/hadoop dfsadmin -refreshNodes again will read the excludes file back into the NameNode, allowing the DataNodes to rejoin the cluster after maintenance has been completed, or additional capacity is needed in the cluster again, etc.<br />Verifying File System Health<br /><br />After decommissioning nodes, restarting a cluster, or periodically during its lifetime, you may want to ensure that the file system is healthy--that files are not corrupted or under-replicated, and that blocks are not missing.<br /><br />Hadoop provides an fsck command to do exactly this. It can be launched at the command line like so:<br /><br /> bin/hadoop fsck [path] [options]<br /><br />If run with no arguments, it will print usage information and exit. If run with the argument /, it will check the health of the entire file system and print a report. If provided with a path to a particular directory or file, it will only check files under that path. If an option argument is given but no path, it will start from the file system root (/). The options may include two different types of options:<br /><br />Action options specify what action should be taken when corrupted files are found. This can be -move, which moves corrupt files to /lost+found, or -delete, which deletes corrupted files.<br /><br />Information options specify how verbose the tool should be in its report. The -files option will list all files it checks as it encounters them. This information can be further expanded by adding the -blocks option, which prints the list of blocks for each file. Adding -locations to these two options will then print the addresses of the DataNodes holding these blocks. Still more information can be retrieved by adding -racks to the end of this list, which then prints the rack topology information for each location. (See the next subsection for more information on configuring network rack awareness.) Note that the later options do not imply the former; you must use them in conjunction with one another. Also, note that the Hadoop program uses -files in a "common argument parser" shared by the different commands such as dfsadmin, fsck, dfs, etc. This means that if you omit a path argument to fsck, it will not receive the -files option that you intend. You can separate common options from fsck-specific options by using -- as an argument, like so:<br /><br /> bin/hadoop fsck -- -files -blocks<br /><br />The -- is not required if you provide a path to start the check from, or if you specify another argument first such as -move.<br /><br />By default, fsck will not operate on files still open for write by another client. A list of such files can be produced with the -openforwrite option.Unknownnoreply@blogger.com1tag:blogger.com,1999:blog-9143691602451231114.post-40095760084245744592011-04-03T07:38:00.002-07:002011-04-03T07:39:17.585-07:00HDFS Permissions and SecurityStarting with Hadoop 0.16.1, HDFS has included a rudimentary file permissions system. This permission system is based on the POSIX model, but does not provide strong security for HDFS files. The HDFS permissions system is designed to prevent accidental corruption of data or casual misuse of information within a group of users who share access to a cluster. It is not a strong security model that guarantees denial of access to unauthorized parties.<br /><br />HDFS security is based on the POSIX model of users and groups. Each file or directory has 3 permissions (read, write and execute) associated with it at three different granularities: the file's owner, users in the same group as the owner, and all other users in the system. As the HDFS does not provide the full POSIX spectrum of activity, some combinations of bits will be meaningless. For example, no file can be executed; the +x bits cannot be set on files (only directories). Nor can an existing file be written to, although the +w bits may still be set.<br /><br />Security permissions and ownership can be modified using the bin/hadoop dfs -chmod, -chown, and -chgrp operations described earlier in this document; they work in a similar fashion to the POSIX/Linux tools of the same name.<br /><br />Determining identity - Identity is not authenticated formally with HDFS; it is taken from an extrinsic source. The Hadoop system is programmed to use the user's current login as their Hadoop username (i.e., the equivalent of whoami). The user's current working group list (i.e, the output of groups) is used as the group list in Hadoop. HDFS itself does not verify that this username is genuine to the actual operator.<br /><br />Superuser status - The username which was used to start the Hadoop process (i.e., the username who actually ran bin/start-all.sh or bin/start-dfs.sh) is acknowledged to be the superuser for HDFS. If this user interacts with HDFS, he does so with a special username superuser. This user's operations on HDFS never fail, regardless of permission bits set on the particular files he manipulates. If Hadoop is shutdown and restarted under a different username, that username is then bound to the superuser account.<br /><br />Supergroup - There is also a special group named supergroup, whose membership is controlled by the configuration parameter dfs.permissions.supergroup.<br /><br />Disabling permissions - By default, permissions are enabled on HDFS. The permission system can be disabled by setting the configuration option dfs.permissions to false. The owner, group, and permissions bits associated with each file and directory will still be preserved, but the HDFS process does not enforce them, except when using permissions-related operations such as -chmod.Unknownnoreply@blogger.com0tag:blogger.com,1999:blog-9143691602451231114.post-84146027812242327912011-04-03T07:38:00.001-07:002011-04-03T07:38:55.756-07:00Using HDFS ProgrammaticallyWhile HDFS can be manipulated explicitly through user commands, or implicitly as the input to or output from a Hadoop MapReduce job, you can also work with HDFS inside your own Java applications. (A JNI-based wrapper, libhdfs also provides this functionality in C/C++ programs.)<br /><br />This section provides a short tutorial on using the Java-based HDFS API. It will be based on the following code listing:<br /><br />1: import java.io.File;<br />2: import java.io.IOException;<br />3:<br />4: import org.apache.hadoop.conf.Configuration;<br />5: import org.apache.hadoop.fs.FileSystem;<br />6: import org.apache.hadoop.fs.FSDataInputStream;<br />7: import org.apache.hadoop.fs.FSDataOutputStream;<br />8: import org.apache.hadoop.fs.Path;<br />9:<br />10: public class HDFSHelloWorld {<br />11:<br />12: public static final String theFilename = "hello.txt";<br />13: public static final String message = "Hello, world!\n";<br />14:<br />15: public static void main (String [] args) throws IOException {<br />16:<br />17: Configuration conf = new Configuration();<br />18: FileSystem fs = FileSystem.get(conf);<br />19:<br />20: Path filenamePath = new Path(theFilename);<br />21:<br />22: try {<br />23: if (fs.exists(filenamePath)) {<br />24: // remove the file first<br />25: fs.delete(filenamePath);<br />26: }<br />27:<br />28: FSDataOutputStream out = fs.create(filenamePath);<br />29: out.writeUTF(message;<br />30: out.close();<br />31:<br />32: FSDataInputStream in = fs.open(filenamePath);<br />33: String messageIn = in.readUTF();<br />34: System.out.print(messageIn);<br />35: in.close();<br />46: } catch (IOException ioe) {<br />47: System.err.println("IOException during operation: " + ioe.toString());<br />48: System.exit(1);<br />49: }<br />40: }<br />41: }<br /><br />This program creates a file named hello.txt, writes a short message into it, then reads it back and prints it to the screen. If the file already existed, it is deleted first.<br /><br />First we get a handle to an abstract FileSystem object, as specified by the application configuration. The Configuration object created uses the default parameters.<br /><br />17: Configuration conf = new Configuration();<br />18: FileSystem fs = FileSystem.get(conf);<br /><br />The FileSystem interface actually provides a generic abstraction suitable for use in several file systems. Depending on the Hadoop configuration, this may use HDFS or the local file system or a different one altogether. If this test program is launched via the ordinary 'java classname' command line, it may not find conf/hadoop-site.xml and will use the local file system. To ensure that it uses the proper Hadoop configuration, launch this program through Hadoop by putting it in a jar and running:<br /><br />$HADOOP_HOME/bin/hadoop jar yourjar HDFSHelloWorld<br /><br />Regardless of how you launch the program and which file system it connects to, writing to a file is done in the same way:<br /><br />28: FSDataOutputStream out = fs.create(filenamePath);<br />29: out.writeUTF(message);<br />30: out.close();<br /><br />First we create the file with the fs.create() call, which returns an FSDataOutputStream used to write data into the file. We then write the information using ordinary stream writing functions; FSDataOutputStream extends the java.io.DataOutputStream class. When we are done with the file, we close the stream with out.close().<br /><br />This call to fs.create() will overwrite the file if it already exists, but for sake of example, this program explicitly removes the file first anyway (note that depending on this explicit prior removal is technically a race condition). Testing for whether a file exists and removing an existing file are performed by lines 23-26:<br /><br />23: if (fs.exists(filenamePath)) {<br />24: // remove the file first<br />25: fs.delete(filenamePath);<br />26: }<br /><br />Other operations such as copying, moving, and renaming are equally straightforward operations on Path objects performed by the FileSystem.<br /><br />Finally, we re-open the file for read, and pull the bytes from the file, converting them to a UTF-8 encoded string in the process, and print to the screen:<br /><br />32: FSDataInputStream in = fs.open(filenamePath);<br />33: String messageIn = in.readUTF();<br />34: System.out.print(messageIn);<br />35: in.close();<br /><br />The fs.open() method returns an FSDataInputStream, which subclasses java.io.DataInputStream. Data can be read from the stream using the readUTF() operation, as on line 33. When we are done with the stream, we call close() to free the handle associated with the file.<br /><br />More information:<br /><br />Complete JavaDoc for the HDFS API is provided at http://hadoop.apache.org/common/docs/r0.20.2/api/index.html.<br /><br />A direct link to the FileSystem interface is: http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/fs/FileSystem.html.<br /><br />Another example HDFS application is available on the Hadoop wiki. This implements a file copy operation.Unknownnoreply@blogger.com3tag:blogger.com,1999:blog-9143691602451231114.post-32638602129955637502011-04-03T07:25:00.000-07:002011-04-03T07:38:17.684-07:00HDFS Command ReferenceThere are many more commands in bin/hadoop dfs than were demonstrated here, although these basic operations will get you started. Running bin/hadoop dfs with no additional arguments will list all commands which can be run with the FsShell system. Furthermore, bin/hadoop dfs -help commandName will display a short usage summary for the operation in question, if you are stuck.<br /><br />A table of all operations is reproduced below. The following conventions are used for parameters:<br /><br /> * italics denote variables to be filled out by the user.<br /> * "path" means any file or directory name.<br /> * "path..." means one or more file or directory names.<br /> * "file" means any filename.<br /> *<br /> * "src" and "dest" are path names in a directed operation.<br /> * "localSrc" and "localDest" are paths as above, but on the local file system. All other file and path names refer to objects inside HDFS.<br /> * Parameters in [brackets] are optional.<br /><br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjt88LNzDNjED2lsJKOW8KHrhvdj6JufP_Hy7HL4CHNPIqxjfax2G1PMIV1ChStFEOGtuoszC5VVEVY5_XcnKyk4dQ2UgEOC5APE7u8pwwmHiSTZ64jksNtI_uYDXQtOU0AjqECIwOapZY/s1600/Hadoop+Commands.jpg"><img style="cursor:pointer; cursor:hand;width: 400px; height: 287px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjt88LNzDNjED2lsJKOW8KHrhvdj6JufP_Hy7HL4CHNPIqxjfax2G1PMIV1ChStFEOGtuoszC5VVEVY5_XcnKyk4dQ2UgEOC5APE7u8pwwmHiSTZ64jksNtI_uYDXQtOU0AjqECIwOapZY/s400/Hadoop+Commands.jpg" border="0" alt=""id="BLOGGER_PHOTO_ID_5591366250207749666" /></a><br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjKRayGibkFlqTLnYfrXELIKHEGO4oQUSNuYeQgOqhbcfnbFS3slqjECsEqXKoIotZK-uOSP-pg9KAObTrseBx8Uf4uSjJ0lZ5LfqDWYwdH0vJvgn05TpFf74ehcBFw_PkgAQel14YFpv4/s1600/Hadoop+Commands-2.jpg"><img style="cursor:pointer; cursor:hand;width: 400px; height: 333px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjKRayGibkFlqTLnYfrXELIKHEGO4oQUSNuYeQgOqhbcfnbFS3slqjECsEqXKoIotZK-uOSP-pg9KAObTrseBx8Uf4uSjJ0lZ5LfqDWYwdH0vJvgn05TpFf74ehcBFw_PkgAQel14YFpv4/s400/Hadoop+Commands-2.jpg" border="0" alt=""id="BLOGGER_PHOTO_ID_5591366453762036786" /></a><br /><br />DFSAdmin Command Reference<br /><br />While the dfs module for bin/hadoop provides common file and directory manipulation commands, they all work with objects within the file system. The dfsadmin module manipulates or queries the file system as a whole. The operation of the commands in this module is described in this section.<br /><br />Getting overall status: A brief status report for HDFS can be retrieved with bin/hadoop dfsadmin -report. This returns basic information about the overall health of the HDFS cluster, as well as some per-server metrics.<br /><br />More involved status: If you need to know more details about what the state of the NameNode's metadata is, the command bin/hadoop dfsadmin -metasave filename will record this information in filename. The metasave command will enumerate lists of blocks which are under-replicated, in the process of being replicated, and scheduled for deletion. NB: The help for this command states that it "saves NameNode's primary data structures," but this is a misnomer; the NameNode's state cannot be restored from this information. However, it will provide good information about how the NameNode is managing HDFS's blocks.<br /><br />Safemode: Safemode is an HDFS state in which the file system is mounted read-only; no replication is performed, nor can files be created or deleted. This is automatically entered as the NameNode starts, to allow all DataNodes time to check in with the NameNode and announce which blocks they hold, before the NameNode determines which blocks are under-replicated, etc. The NameNode waits until a specific percentage of the blocks are present and accounted-for; this is controlled in the configuration by the dfs.safemode.threshold.pct parameter. After this threshold is met, safemode is automatically exited, and HDFS allows normal operations. The bin/hadoop dfsadmin -safemode what command allows the user to manipulate safemode based on the value of what, described below:<br /><br /> * enter - Enters safemode<br /> * leave - Forces the NameNode to exit safemode<br /> * get - Returns a string indicating whether safemode is ON or OFF<br /> * wait - Waits until safemode has exited and returns<br /><br />Changing HDFS membership - When decommissioning nodes, it is important to disconnect nodes from HDFS gradually to ensure that data is not lost. See the section on decommissioning later in this document for an explanation of the use of the -refreshNodes dfsadmin command.<br /><br />Upgrading HDFS versions - When upgrading from one version of Hadoop to the next, the file formats used by the NameNode and DataNodes may change. When you first start the new version of Hadoop on the cluster, you need to tell Hadoop to change the HDFS version (or else it will not mount), using the command: bin/start-dfs.sh -upgrade. It will then begin upgrading the HDFS version. The status of an ongoing upgrade operation can be queried with the bin/hadoop dfsadmin -upgradeProgress status command. More verbose information can be retrieved with bin/hadoop dfsadmin -upgradeProgress details. If the upgrade is blocked and you would like to force it to continue, use the command: bin/hadoop dfsadmin -upgradeProgress force. (Note: be sure you know what you are doing if you use this last command.)<br /><br />When HDFS is upgraded, Hadoop retains backup information allowing you to downgrade to the original HDFS version in case you need to revert Hadoop versions. To back out the changes, stop the cluster, re-install the older version of Hadoop, and then use the command: bin/start-dfs.sh -rollback. It will restore the previous HDFS state.<br /><br />Only one such archival copy can be kept at a time. Thus, after a few days of operation with the new version (when it is deemed stable), the archival copy can be removed with the command bin/hadoop dfsadmin -finalizeUpgrade. The rollback command cannot be issued after this point. This must be performed before a second Hadoop upgrade is allowed.<br /><br />Getting help - As with the dfs module, typing bin/hadoop dfsadmin -help cmd will provide more usage information about the particular command.Unknownnoreply@blogger.com0tag:blogger.com,1999:blog-9143691602451231114.post-25680619004357125122011-04-03T07:21:00.001-07:002011-04-03T07:21:51.734-07:00Starting HDFSNow we must format the file system that we just configured:<br /><br /> user@namenode:hadoop$ bin/hadoop namenode -format<br /><br />This process should only be performed once. When it is complete, we are free to start the distributed file system:<br /><br /> user@namenode:hadoop$ bin/start-dfs.sh<br /><br />This command will start the NameNode server on the master machine (which is where the start-dfs.sh script was invoked). It will also start the DataNode instances on each of the slave machines. In a single-machine "cluster," this is the same machine as the NameNode instance. On a real cluster of two or more machines, this script will ssh into each slave machine and start a DataNode instance.Unknownnoreply@blogger.com1tag:blogger.com,1999:blog-9143691602451231114.post-39546222636998585322011-04-03T07:18:00.000-07:002011-04-03T07:21:29.357-07:00Configuring HDFSThe HDFS for your cluster can be configured in a very short amount of time. First we will fill out the relevant sections of the Hadoop configuration file, then format the NameNode.<br /><br /><span style="font-weight:bold;">Cluster configuration</span><br /><br />These instructions for cluster configuration assume that you have already downloaded and unzipped a copy of Hadoop. Module 3 discusses getting started with Hadoop for this tutorial. Module 7 discusses how to set up a larger cluster and provides preliminary setup instructions for Hadoop, including downloading prerequisite software.<br /><br />The HDFS configuration is located in a set of XML files in the Hadoop configuration directory; conf/ under the main Hadoop install directory (where you unzipped Hadoop to). The conf/hadoop-defaults.xml file contains default values for every parameter in Hadoop. This file is considered read-only. You override this configuration by setting new values in conf/hadoop-site.xml. This file should be replicated consistently across all machines in the cluster. (It is also possible, though not advisable, to host it on NFS.)<br /><br />Configuration settings are a set of key-value pairs of the format:<br /><br /><br /> <property><br /> <name>property-name</name><br /> <value>property-value</value><br /><br /> </property><br /><br /><br /><br />Adding the line <final>true</final> inside the property body will prevent properties from being overridden by user applications. This is useful for most system-wide configuration options.<br /><br />The following settings are necessary to configure HDFS:<br /><br /><br />These settings are described individually below:<br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiVo_2huYu2kL3WeGydgVpA2NRSz2FP2DFc5Gxn2hsuObCrc3hQtzhDVVFkWK27kBXBu7H8Nr47I4wM311b-3EtYcnFfyYKEnUuW47TouJe-eAjk4XC4M3bmDnQz9PqhHWxzTIkNLZ6bwY/s1600/HDFS+Settings.jpg"><img style="cursor:pointer; cursor:hand;width: 400px; height: 88px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiVo_2huYu2kL3WeGydgVpA2NRSz2FP2DFc5Gxn2hsuObCrc3hQtzhDVVFkWK27kBXBu7H8Nr47I4wM311b-3EtYcnFfyYKEnUuW47TouJe-eAjk4XC4M3bmDnQz9PqhHWxzTIkNLZ6bwY/s400/HDFS+Settings.jpg" border="0" alt=""id="BLOGGER_PHOTO_ID_5591362163320098098" /></a><br /><br /><br />These settings are described individually below:<br /><br />fs.default.name - This is the URI (protocol specifier, hostname, and port) that describes the NameNode for the cluster. Each node in the system on which Hadoop is expected to operate needs to know the address of the NameNode. The DataNode instances will register with this NameNode, and make their data available through it. Individual client programs will connect to this address to retrieve the locations of actual file blocks.<br /><br />dfs.data.dir - This is the path on the local file system in which the DataNode instance should store its data. It is not necessary that all DataNode instances store their data under the same local path prefix, as they will all be on separate machines; it is acceptable that these machines are heterogeneous. However, it will simplify configuration if this directory is standardized throughout the system. By default, Hadoop will place this under /tmp. This is fine for testing purposes, but is an easy way to lose actual data in a production system, and thus must be overridden.<br /><br />dfs.name.dir - This is the path on the local file system of the NameNode instance where the NameNode metadata is stored. It is only used by the NameNode instance to find its information, and does not exist on the DataNodes. The caveat above about /tmp applies to this as well; this setting must be overridden in a production system.<br /><br />Another configuration parameter, not listed above, is dfs.replication. This is the default replication factor for each block of data in the file system. For a production cluster, this should usually be left at its default value of 3. (You are free to increase your replication factor, though this may be unnecessary and use more space than is required. Fewer than three replicas impact the high availability of information, and possibly the reliability of its storage.)<br />The following information can be pasted into the hadoop-site.xml file for a single-node configuration:<br /><br /><configuration><br /> <property><br /> <name>fs.default.name</name><br /><br /> <value>hdfs://your.server.name.com:9000</value><br /> </property><br /> <property><br /> <name>dfs.data.dir</name><br /><br /> <value>/home/username/hdfs/data</value><br /> </property><br /> <property><br /> <name>dfs.name.dir</name><br /><br /> <value>/home/username/hdfs/name</value><br /> </property><br /></configuration><br /><br />Of course, your.server.name.com needs to be changed, as does username. Using port 9000 for the NameNode is arbitrary.<br /><br />After copying this information into your conf/hadoop-site.xml file, copy this to the conf/ directories on all machines in the cluster.<br /><br />The master node needs to know the addresses of all the machines to use as DataNodes; the startup scripts depend on this. Also in the conf/ directory, edit the file slaves so that it contains a list of fully-qualified hostnames for the slave instances, one host per line. On a multi-node setup, the master node (e.g., localhost) is not usually present in this file.<br /><br />Then make the directories necessary:<br /><br /> user@EachMachine$ mkdir -p $HOME/hdfs/data<br /><br /> user@namenode$ mkdir -p $HOME/hdfs/name<br /><br />The user who owns the Hadoop instances will need to have read and write access to each of these directories. It is not necessary for all users to have access to these directories. Set permissions with chmod as appropriate. In a large-scale environment, it is recommended that you create a user named "hadoop" on each node for the express purpose of owning and running Hadoop tasks. For a single individual's machine, it is perfectly acceptable to run Hadoop under your own username. It is not recommended that you run Hadoop as root.Unknownnoreply@blogger.com0tag:blogger.com,1999:blog-9143691602451231114.post-56074763645828157772011-04-03T07:16:00.000-07:002011-04-03T07:18:17.080-07:00HDFS ArchitectureHDFS is a block-structured file system: individual files are broken into blocks of a fixed size. These blocks are stored across a cluster of one or more machines with data storage capacity. Individual machines in the cluster are referred to as DataNodes. A file can be made of several blocks, and they are not necessarily stored on the same machine; the target machines which hold each block are chosen randomly on a block-by-block basis. Thus access to a file may require the cooperation of multiple machines, but supports file sizes far larger than a single-machine DFS; individual files can require more space than a single hard drive could hold.<br /><br />If several machines must be involved in the serving of a file, then a file could be rendered unavailable by the loss of any one of those machines. HDFS combats this problem by replicating each block across a number of machines (3, by default).<br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgaGaMbcM821FppdRpru0bGAhAnpbWT1rzxtERdOumM38kR11WRSzTtrKqwKnthIXrlU9fWPw7g3VTinEDg14TcREtDBn_TxTlWUKWUzfpLjrfWbh8rUsGl-7hgU-RBWAen-8sG9os2HBw/s1600/HDFS+Architecture.png"><img style="cursor:pointer; cursor:hand;width: 400px; height: 241px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgaGaMbcM821FppdRpru0bGAhAnpbWT1rzxtERdOumM38kR11WRSzTtrKqwKnthIXrlU9fWPw7g3VTinEDg14TcREtDBn_TxTlWUKWUzfpLjrfWbh8rUsGl-7hgU-RBWAen-8sG9os2HBw/s400/HDFS+Architecture.png" border="0" alt=""id="BLOGGER_PHOTO_ID_5591361300565961138" /></a><br /><br />Most block-structured file systems use a block size on the order of 4 or 8 KB. By contrast, the default block size in HDFS is 64MB -- orders of magnitude larger. This allows HDFS to decrease the amount of metadata storage required per file (the list of blocks per file will be smaller as the size of individual blocks increases). Furthermore, it allows for fast streaming reads of data, by keeping large amounts of data sequentially laid out on the disk. The consequence of this decision is that HDFS expects to have very large files, and expects them to be read sequentially. Unlike a file system such as NTFS or EXT, which see many very small files, HDFS expects to store a modest number of very large files: hundreds of megabytes, or gigabytes each. After all, a 100 MB file is not even two full blocks. Files on your computer may also frequently be accessed "randomly," with applications cherry-picking small amounts of information from several different locations in a file which are not sequentially laid out. By contrast, HDFS expects to read a block start-to-finish for a program. This makes it particularly useful to the MapReduce style of programming described in Module 4. That having been said, attempting to use HDFS as a general-purpose distributed file system for a diverse set of applications will be suboptimal.<br /><br />Because HDFS stores files as a set of large blocks across several machines, these files are not part of the ordinary file system. Typing ls on a machine running a DataNode daemon will display the contents of the ordinary Linux file system being used to host the Hadoop services -- but it will not include any of the files stored inside the HDFS. This is because HDFS runs in a separate namespace, isolated from the contents of your local files. The files inside HDFS (or more accurately: the blocks that make them up) are stored in a particular directory managed by the DataNode service, but the files will named only with block ids. You cannot interact with HDFS-stored files using ordinary Linux file modification tools (e.g., ls, cp, mv, etc). However, HDFS does come with its own utilities for file management, which act very similar to these familiar tools. A later section in this tutorial will introduce you to these commands and their operation.<br /><br />It is important for this file system to store its metadata reliably. Furthermore, while the file data is accessed in a write once and read many model, the metadata structures (e.g., the names of files and directories) can be modified by a large number of clients concurrently. It is important that this information is never desynchronized. Therefore, it is all handled by a single machine, called the NameNode. The NameNode stores all the metadata for the file system. Because of the relatively low amount of metadata per file (it only tracks file names, permissions, and the locations of each block of each file), all of this information can be stored in the main memory of the NameNode machine, allowing fast access to the metadata.<br /><br />To open a file, a client contacts the NameNode and retrieves a list of locations for the blocks that comprise the file. These locations identify the DataNodes which hold each block. Clients then read file data directly from the DataNode servers, possibly in parallel. The NameNode is not directly involved in this bulk data transfer, keeping its overhead to a minimum.<br /><br />Of course, NameNode information must be preserved even if the NameNode machine fails; there are multiple redundant systems that allow the NameNode to preserve the file system's metadata even if the NameNode itself crashes irrecoverably. NameNode failure is more severe for the cluster than DataNode failure. While individual DataNodes may crash and the entire cluster will continue to operate, the loss of the NameNode will render the cluster inaccessible until it is manually restored. Fortunately, as the NameNode's involvement is relatively minimal, the odds of it failing are considerably lower than the odds of an arbitrary DataNode failing at any given point in time.Unknownnoreply@blogger.com1tag:blogger.com,1999:blog-9143691602451231114.post-21120127470199836092011-04-03T07:14:00.000-07:002011-04-03T07:16:03.183-07:00HDFS IntroductionHDFS, the Hadoop Distributed File System, is a distributed file system designed to hold very large amounts of data (terabytes or even petabytes), and provide high-throughput access to this information. Files are stored in a redundant fashion across multiple machines to ensure their durability to failure and high availability to very parallel applications. This module introduces the design of this distributed file system and instructions on how to operate it.<br /><br />A distributed file system is designed to hold a large amount of data and provide access to this data to many clients distributed across a network. There are a number of distributed file systems that solve this problem in different ways.<br /><br />NFS, the Network File System, is the most ubiquitous distributed file system. It is one of the oldest still in use. While its design is straightforward, it is also very constrained. NFS provides remote access to a single logical volume stored on a single machine. An NFS server makes a portion of its local file system visible to external clients. The clients can then mount this remote file system directly into their own Linux file system, and interact with it as though it were part of the local drive.<br /><br />One of the primary advantages of this model is its transparency. Clients do not need to be particularly aware that they are working on files stored remotely. The existing standard library methods like open(), close(), fread(), etc. will work on files hosted over NFS.<br /><br />But as a distributed file system, it is limited in its power. The files in an NFS volume all reside on a single machine. This means that it will only store as much information as can be stored in one machine, and does not provide any reliability guarantees if that machine goes down (e.g., by replicating the files to other servers). Finally, as all the data is stored on a single machine, all the clients must go to this machine to retrieve their data. This can overload the server if a large number of clients must be handled. Clients must also always copy the data to their local machines before they can operate on it.<br /><br />HDFS is designed to be robust to a number of the problems that other DFS's such as NFS are vulnerable to. In particular:<br /><br /> * HDFS is designed to store a very large amount of information (terabytes or petabytes). This requires spreading the data across a large number of machines. It also supports much larger file sizes than NFS.<br /> * HDFS should store data reliably. If individual machines in the cluster malfunction, data should still be available.<br /> * HDFS should provide fast, scalable access to this information. It should be possible to serve a larger number of clients by simply adding more machines to the cluster.<br /> * HDFS should integrate well with Hadoop MapReduce, allowing data to be read and computed upon locally when possible.<br /><br />But while HDFS is very scalable, its high performance design also restricts it to a particular class of applications; it is not as general-purpose as NFS. There are a large number of additional decisions and trade-offs that were made with HDFS. In particular:<br /><br /> * Applications that use HDFS are assumed to perform long sequential streaming reads from files. HDFS is optimized to provide streaming read performance; this comes at the expense of random seek times to arbitrary positions in files.<br /> * Data will be written to the HDFS once and then read several times; updates to files after they have already been closed are not supported. (An extension to Hadoop will provide support for appending new data to the ends of files; it is scheduled to be included in Hadoop 0.19 but is not available yet.)<br /> * Due to the large size of files, and the sequential nature of reads, the system does not provide a mechanism for local caching of data. The overhead of caching is great enough that data should simply be re-read from HDFS source.<br /> * Individual machines are assumed to fail on a frequent basis, both permanently and intermittently. The cluster must be able to withstand the complete failure of several machines, possibly many happening at the same time (e.g., if a rack fails all together). While performance may degrade proportional to the number of machines lost, the system as a whole should not become overly slow, nor should information be lost. Data replication strategies combat this problem.<br /><br />The design of HDFS is based on the design of GFS, the Google File System. Its design was described in a paper published by Google.Unknownnoreply@blogger.com0tag:blogger.com,1999:blog-9143691602451231114.post-42962218964260035272011-04-03T07:08:00.000-07:002011-04-03T07:13:29.442-07:00The Hadoop ApproachHadoop is designed to efficiently process large volumes of information by connecting many commodity computers together to work in parallel. The theoretical 1000-CPU machine described earlier would cost a very large amount of money, far more than 1,000 single-CPU or 250 quad-core machines. Hadoop will tie these smaller and more reasonably priced machines together into a single cost-effective compute cluster.<br /><br /><span style="font-weight:bold;">Comparison to Existing Techniques</span><br /><br />Performing computation on large volumes of data has been done before, usually in a distributed setting. What makes Hadoop unique is its simplified programming model which allows the user to quickly write and test distributed systems, and its efficient, automatic distribution of data and work across machines and in turn utilizing the underlying parallelism of the CPU cores.<br /><br />Grid scheduling of computers can be done with existing systems such as Condor. But Condor does not automatically distribute data: a separate SAN must be managed in addition to the compute cluster. Furthermore, collaboration between multiple compute nodes must be managed with a communication system such as MPI. This programming model is challenging to work with and can lead to the introduction of subtle errors.<br /><br /><span style="font-weight:bold;">Data Distribution</span><br /><br />In a Hadoop cluster, data is distributed to all the nodes of the cluster as it is being loaded in. The Hadoop Distributed File System (HDFS) will split large data files into chunks which are managed by different nodes in the cluster. In addition to this each chunk is replicated across several machines, so that a single machine failure does not result in any data being unavailable. An active monitoring system then re-replicates the data in response to system failures which can result in partial storage. Even though the file chunks are replicated and distributed across several machines, they form a single namespace, so their contents are universally accessible.<br /><br />Data is conceptually record-oriented in the Hadoop programming framework. Individual input files are broken into lines or into other formats specific to the application logic. Each process running on a node in the cluster then processes a subset of these records. The Hadoop framework then schedules these processes in proximity to the location of data/records using knowledge from the distributed file system. Since files are spread across the distributed file system as chunks, each compute process running on a node operates on a subset of the data. Which data operated on by a node is chosen based on its locality to the node: most data is read from the local disk straight into the CPU, alleviating strain on network bandwidth and preventing unnecessary network transfers. This strategy of moving computation to the data, instead of moving the data to the computation allows Hadoop to achieve high data locality which in turn results in high performance.<br /><br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhgjiHIe42QQnTAb7nj1FHnBJKg94tjgjDkc-B69vkIysA_c4nm1pqroV84IBzcfLYFv0g1dp3QiTDiM72HM2DF3ST3sj-mbqCkVTtYKMDyntcyh035O_lbExnaGDaLZo7GUPLe9o9RA7o/s1600/Hadoop+Approach.png"><img style="cursor:pointer; cursor:hand;width: 400px; height: 242px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhgjiHIe42QQnTAb7nj1FHnBJKg94tjgjDkc-B69vkIysA_c4nm1pqroV84IBzcfLYFv0g1dp3QiTDiM72HM2DF3ST3sj-mbqCkVTtYKMDyntcyh035O_lbExnaGDaLZo7GUPLe9o9RA7o/s400/Hadoop+Approach.png" border="0" alt=""id="BLOGGER_PHOTO_ID_5591359530267028946" /></a><br /><br /><br /><span style="font-weight:bold;">MapReduce: Isolated Processes</span><br /><br />Hadoop limits the amount of communication which can be performed by the processes, as each individual record is processed by a task in isolation from one another. While this sounds like a major limitation at first, it makes the whole framework much more reliable. Hadoop will not run just any program and distribute it across a cluster. Programs must be written to conform to a particular programming model, named "MapReduce."<br /><br />In MapReduce, records are processed in isolation by tasks called Mappers. The output from the Mappers is then brought together into a second set of tasks called Reducers, where results from different mappers can be merged together.<br /><br /><a onblur="try {parent.deselectBloggerImageGracefully();} catch(e) {}" href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjCmPt2hnCnUu2IHMeD1eTatTEn10XY_9FnrymlJJu9L4SozjVZhpe_zF-GgwAlDb8HoTRtbwh8GSGHBU6hvOL-f6dcxw4QDbR3SshjNyq0Kr2TbVGxrdqveXWM2D4ouM_M8kWjdDtln6o/s1600/Hadoop+MapReduce.png"><img style="cursor:pointer; cursor:hand;width: 400px; height: 259px;" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjCmPt2hnCnUu2IHMeD1eTatTEn10XY_9FnrymlJJu9L4SozjVZhpe_zF-GgwAlDb8HoTRtbwh8GSGHBU6hvOL-f6dcxw4QDbR3SshjNyq0Kr2TbVGxrdqveXWM2D4ouM_M8kWjdDtln6o/s400/Hadoop+MapReduce.png" border="0" alt=""id="BLOGGER_PHOTO_ID_5591360011254446402" /></a><br /><br />Separate nodes in a Hadoop cluster still communicate with one another. However, in contrast to more conventional distributed systems where application developers explicitly marshal byte streams from node to node over sockets or through MPI buffers, communication in Hadoop is performed implicitly. Pieces of data can be tagged with key names which inform Hadoop how to send related bits of information to a common destination node. Hadoop internally manages all of the data transfer and cluster topology issues.<br /><br />By restricting the communication between nodes, Hadoop makes the distributed system much more reliable. Individual node failures can be worked around by restarting tasks on other machines. Since user-level tasks do not communicate explicitly with one another, no messages need to be exchanged by user programs, nor do nodes need to roll back to pre-arranged checkpoints to partially restart the computation. The other workers continue to operate as though nothing went wrong, leaving the challenging aspects of partially restarting the program to the underlying Hadoop layer.<br /><br /><span style="font-weight:bold;">Flat Scalability</span><br /><br />One of the major benefits of using Hadoop in contrast to other distributed systems is its flat scalability curve. Executing Hadoop on a limited amount of data on a small number of nodes may not demonstrate particularly stellar performance as the overhead involved in starting Hadoop programs is relatively high. Other parallel/distributed programming paradigms such as MPI (Message Passing Interface) may perform much better on two, four, or perhaps a dozen machines. Though the effort of coordinating work among a small number of machines may be better-performed by such systems, the price paid in performance and engineering effort (when adding more hardware as a result of increasing data volumes) increases non-linearly.<br /><br />A program written in distributed frameworks other than Hadoop may require large amounts of refactoring when scaling from ten to one hundred or one thousand machines. This may involve having the program be rewritten several times; fundamental elements of its design may also put an upper bound on the scale to which the application can grow.<br /><br />Hadoop, however, is specifically designed to have a very flat scalability curve. After a Hadoop program is written and functioning on ten nodes, very little--if any--work is required for that same program to run on a much larger amount of hardware. Orders of magnitude of growth can be managed with little re-work required for your applications. The underlying Hadoop platform will manage the data and hardware resources and provide dependable performance growth proportionate to the number of machines available.Unknownnoreply@blogger.com1