Wednesday, August 20, 2008

Hadoop as a Batch Job using PBS

During my previous data analyses using Hadoop and CGL-MapReduce I had to use the compute resources accessible via a job queue. For this purpose I used the Quarry cluster @ Indiana University which support batch job submissions via Portable Batch System(PBS). I contacted one of the system administrators of the Quarry (George Wm Turner) and he point me to the Hadoop On Demand (HOD) project of Apache which mainly try to solve the problem that I am facing.

We gave HOD a serious try but could not get it working the way we wanted. What we tried was to install HOD in set of nodes and let users to use it on demand via a job queue. This option simply did not work for multiple users, since the configurations options for the file system overlap between the users causing only one user to use Hadoop at a given time.

With this situation, I decided to give it a try to start Hadoop dynamically using PBS. The task the script should perform is as follows.

1. Identify the master node
2. Identify the slave nodes
3. Update $HADOOP_HOME/conf/masters and $HADOOP_HOME/conf/slaves files
4. UPdate the $HADOOP_HOME/conf/hadoop-site.xml
5. Cleanup any hadoop file system specific directories created in previous runs
6. Format a new Hadoop Distribtued File System (HDFS)
7. Start Hadoop daemons
8. Execute the map-reduce computation
9. Stop Daemons
10. Stop HDFS

Although the list is bit long, most of the tasks are straigt forward to perfom in a shell script.

The first problem I face is finding the IP addresses of the nodes. PBS passes this information via the variable PBS_NODEFILE. However, in a multi-core and multi-processor systems the PBS_NODEFILE contains multiple entries of the same node depending on the numebr of processors we requested in each node. So I have to find the "SET" of IP addresses eliminating the duplicates. Then have to update the configuration files depending on this information. So I decided to use a Java program to do the job (with my shell script knowledge I could not find an easy way to do a "SET" operation)

Once I have this simple Java file to perform the steps 1 -4 the rest is straightforward. Here is my PBS script and I the link after the script will show you the simple Java program which update the configuration files.

----------------------------------------------------------
#!/bin/bash
#PBS -l nodes=5:ppn=8
#PBS -l walltime=01:00:00
#PBS -N hdhep
#PBS -q hod

java -cp ~/hadoop_root/bin HadoopConfig $PBS_NODEFILE ~/hadoop-0.17.0/conf

for line in `cat $PBS_NODEFILE`;do
echo $line
ssh $line rm -rf /tmp/hadoop*
done

var=`head -1 $PBS_NODEFILE`
echo $var

ssh -x $var ~/hadoop-0.17.0/bin/hadoop namenode -format
ssh -x $var ~/hadoop-0.17.0/bin/start-dfs.sh
sleep 60
ssh -x $var ~/hadoop-0.17.0/bin/start-mapred.sh
sleep 60
ssh -x $var ~/hadoop-0.17.0/bin/hadoop jar ~/hadoop-0.17.0/hep.jar hep ~/hadoop_root /N/dc/scratch/jaliya/alldata 40 2
ssh -x $var ~/hadoop-0.17.0/bin/stop-mapred.sh
ssh -x $var ~/hadoop-0.17.0/bin/stop-dfs.sh
-------------------------------------------------------------------
[HadoopConfig.java]

As you can see in the script, I use ssh to log into each node of the cluster and perform the cleaning up of the HDFS directories. Then I use SSH to login to the master node to start the Hadoop daemons.

Next comes the actual execution of the data analysis task, which I have coded in the hep.jar.
After the MapReduce computation is over the rest of the commands simply stop the daemons.

This is method has more flexibility to the user and requires no changes for the batch job scheduling system. It also serves as an easy option when the number of MapReduce computations are smaller than the other batch jobs.

However, if the cluster is dedicated to run MapReduce computations, then everybody starting and stopping a HDFS file system does not make sense. Ideally the Hadoop should be started by the adminitrators and then the users should be allowed to simply execute the MapReduce computations on it.

Your comments are welcome!

High Energy Physics Data Analysis Using Hadoop and CGL-MapReduce

After the previous set of tests wtih parallel Kmeans clusting using CGL-MapReduce, Hadoop and MPI I shift the direction of testing to another set of tests. This time, the test is to process large number (and volume of) High Energy Physics data files and produce a histogram of interesting events. The amount of data that needs to be processed is 1 terabyte.

Converting this data analysis into a MapReduce version is straigt forward. First the data is split into managable chunks and each map task process some of these chunks and produce histograms of interested events. Reduce tasks merge the resulting histograms producing more concentrated histograms. Finally a merge operation combine all the histograms produced by the reduce tasks.

I performed the above test by incresing the data size on a fixed set of computing nodes using both CGL-MapReduce and Hadoop. To see the scalability of the MapReduce approach and the scalability of the two MapReduce implementations, I performed another test by fixing the amount of data to 100GB and varying the number of compute nodes used. Figure 1 and Figure 2 shows my findings.
Figure 1. HEP data analysis, execution time vs. the volume of data (fixed compute resources)

Figure 2. Total time vs. the number of compute nodes (fixed data)

Hadoop and CGL-MapReduce both show similar performance. The amount of data accessed in each analysis is extremely large and hence the performance is limited by the I/O bandwidth of a given node rather than the total processor cores. The overhead induced by the MapReduce implementations has negligible effect on the overall computation.

The results in Figure 2 shows the scalability of the MapReduce technique and the two implementations. It also shows how the performance increase obtained by the parallelism diminshes after a certain number of computation node for this particular data set.