Thursday, December 22, 2011

run Hadoop under Lava Workload Scheduler

Other title:
run Hadoop in constrained environment
run Hadoop in shared environment with only a user account
run Hadoop with LSF

Opportunity.neu.edu cluster at Northeastern U doesn't have hadoop yet and I can see it will not have it in the near future. So I am wondering whether I can run hadoop as a job under Lava. The plan is like this:
Step 1: run hadoop in single mode as a job.
Step 2: run hadoop as a parallel job after initializing tasktracker on assigned node (MapReduce).
Step 3: run hadoop using full setting, add datanode on assigned node (HDFS).

Step 1: This goal is just simple enough. You just need to follow the get started http://hadoop.apache.org/common/docs/current/#Getting+Started
link and You should be OK. I try the pi example on the front node and it works.

Step2: I want to first try out MapReduce for several reasons. First, most algorithms I commonly use actually is compute-oriented; Second, it's not necessary for MapReduce to deal with HDFS, I can run it with local file system. If something runs wrong, it's easier to kill processes than to clean up files; Third, it should be easy if we can get HDFS running after we understand how to make MapReduce work.

After spending a day search on the Web and try out different configuration, I can run MapReduce under opportunity. First of all, following is the Python script I submit using bsub, configuration for Hadoop is also shown:

bash$ bsub -J hadoop_test -o /home/yguan/ExpJobs/Jobs/exp103/log/hadoop_test.log -n 12 /home/yguan/ExpJobs/Jobs/exp103/template/hadoop_test.py

hadoop_test.py:

#!/usr/bin/python


import os
import sys


templateDir = sys.path[0]
print(templateDir)
machineFileDir = os.path.join(templateDir, 'hadoop_conf/slaves')
configDir = os.path.join(templateDir, 'hadoop')


hosts = os.environ["LSB_HOSTS"]
hostList = hosts.split(' ')
hostList = list(set(hostList)) # remove duplicate host


# write to slaves files
fh = open(machineFileDir, 'w')
for host in hostList:
    fh.write(host)
    fh.write('\n')
fh.close()


# get the network address for the jobtracker.
import platform
nodeName = platform.node()
mapredFileName = os.path.join(templateDir, "hadoop_conf/mapred-site.xml")
mapredFileNameBack = os.path.join(templateDir, "hadoop_conf/mapred-site.xml.back")
fhr = open(mapredFileName, 'r')
fhw = open(mapredFileNameBack, 'w')
# here we assume the second line after mapred.job.tracker line
# is the value line.
mapredJobTrackerLine = False
for line in fhr:
    if "mapred.job.tracker<" in line:
        mapredJobTrackerLine = True
    elif mapredJobTrackerLine == True:
        mapredJobTrackerLine = False
        fhw.write("        "+nodeName+":42312\n")
        continue
    fhw.write(line)
fhr.close()
fhw.close()
os.remove(mapredFileName)
os.rename(mapredFileNameBack, mapredFileName)


# start hadoop/MapReduce part
hadoop_config = " --config " + os.path.join(templateDir, 'hadoop_conf')
hadoop_home = os.environ["HADOOP_HOME"]
os.chdir(hadoop_home)
startHadoop_comm  = "bin/start-mapred.sh "
[input, output, err] = os.popen3(startHadoop_comm + hadoop_config)
for line in output:
    print line


for line in err:
    print line


# run the hadoop job
hadoop_command = "hadoop "
hadoop_run = " jar hadoop-0.20.2-examples.jar pi 100 100"
#command = "bin/hadoop jar hadoop-0.20.2-examples.jar pi 2 100"
command = hadoop_command + hadoop_config + hadoop_run
print(command)


[input, output, err] = os.popen3(command)
for line in output:
    print line
for line in err:
    print line


# shut down hadoop/mapreduce
stopHadoop_comm = "bin/stop-mapred.sh"
[input, output, err] = os.popen3(stopHadoop_comm + hadoop_config)
for line in output:
    print line
for line in err:
    print line

hadoop_conf/core-site.xml:


   
        hadoop.tmp.dir
        /scratch_global/hadoop-yguan/tmp
   
   
        fs.default.name
        file:///
   


hadoop_conf/mapred-site.xml:
   
        mapred.child.tmp
        /scratch/hadoop-yguan/tmp
   
   
        mapred.system.dir
        /scratch_global/hadoop-yguan/tmp/mapred/system
   
   
        mapred.local.dir
        /scratch/hadoop-yguan/tmp/mapred/local
   
   
        mapred.job.tracker
        compute-2-11.local:42312
   
   
        mapred.tasktracker.map.tasks.maximum
        4
   
   
        mapred.tasktracker.reduce.tasks.maximum
        4
   
The workflow here is simple enough. First we get the list of slaves as bsub will tell us the node we should run jobs on. Then we remove duplicate node. The reason is that you only need one datanode/tasktracker on one machine and mapred.tasktracker.{map|reduce}.tasks.maximum will determine the number of parallel JVM running on one machine. Then we replace mapred.job.tracker in mapred-site.xml to the node name our job running on. Now it's time to call bin/start-mapred.sh to set up jobtracker and populate work nodes with tasktrackers. Then we can run your Hadoop job. After the Hadoop is done. We call bin/stop-mapred.sh to stop jobtracker and tasktrackers.

Step 3: Now it's time to make MapReduce and HDFS works. One thing special about HDFS other than MapReduce is that you need to format the file system first. When you type in shell command "bin/hadoop namenode -format", it will ask you for a "Y" to confirm. Thus I play a dirty trick here to have a file containing a "Y" and a newline and serve as the input for the format shell command. So here we have a new version of hadoop_test.py:


#!/usr/bin/python


import os
import sys


templateDir = sys.path[0]
print(templateDir)
machineFileDir = os.path.join(templateDir, 'hadoop_conf/slaves')
configDir = os.path.join(templateDir, 'hadoop')


hosts = os.environ["LSB_HOSTS"]
hostList = hosts.split(' ')
hostList = list(set(hostList)) # remove duplicate host


# write to slaves files
fh = open(machineFileDir, 'w')
for host in hostList:
    fh.write(host)
    fh.write('\n')
fh.close()


# get the network address for the jobtracker.
import platform
nodeName = platform.node()
mapredFileName = os.path.join(templateDir, "hadoop_conf/mapred-site.xml")
mapredFileNameBack = os.path.join(templateDir, "hadoop_conf/mapred-site.xml.back")
fhr = open(mapredFileName, 'r')
fhw = open(mapredFileNameBack, 'w')
# here we assume the second line after mapred.job.tracker line
# is the value line.
mapredJobTrackerLine = False
for line in fhr:
    if "mapred.job.tracker<" in line:
        mapredJobTrackerLine = True
    elif mapredJobTrackerLine == True:
        mapredJobTrackerLine = False
        fhw.write("        "+nodeName+":42312\n")
        continue
    fhw.write(line)
fhr.close()
fhw.close()
os.remove(mapredFileName)
os.rename(mapredFileNameBack, mapredFileName)


# get the network address for the namenode
hdfsFileName = os.path.join(templateDir, "hadoop_conf/core-site.xml")
hdfsFileNameBack = os.path.join(templateDir, "hadoop_conf/core-site.xml.back")
fhr = open(hdfsFileName, 'r')
fhw = open(hdfsFileNameBack, 'w')
hdfsLine = False
for line in fhr:
    if "fs.default.name<" in line:
        hdfsLine = True
    elif hdfsLine == True:
        hdfsLine = False
        fhw.write("        hdfs://"+nodeName+"\n")
        continue
    fhw.write(line)
fhr.close()
fhw.close()
os.remove(hdfsFileName)
os.rename(hdfsFileNameBack, hdfsFileName)


# format the newly created hdfs
hadoop_config = " --config " + os.path.join(templateDir, 'hadoop_conf')
hadoop_home = os.environ["HADOOP_HOME"]
os.chdir(hadoop_home)


format_comm = "bin/hadoop " + hadoop_config + " namenode -format < Y"
[input, output, err] = os.popen3(format_comm)
for line in output:
    print line
for line in err:
    print line


# start hadoop/HDFS part
starthdfs_comm = "bin/start-dfs.sh "
[input, output, err] = os.popen3(starthdfs_comm + hadoop_config)
for line in output:
    print line
for line in err:
    print line


# start hadoop/MapReduce part
startHadoop_comm  = "bin/start-mapred.sh "
[input, output, err] = os.popen3(startHadoop_comm + hadoop_config)
for line in output:
    print line
for line in err:
    print line




# run the hadoop job
hadoop_command = "hadoop "
hadoop_run = " jar hadoop-0.20.2-examples.jar pi 100 100"
#command = "bin/hadoop jar hadoop-0.20.2-examples.jar pi 2 100"
command = hadoop_command + hadoop_config + hadoop_run
print(command)


[input, output, err] = os.popen3(command)
for line in output:
    print line
for line in err:
    print line


# shut down hadoop/mapreduce
stopHadoop_comm = "bin/stop-mapred.sh"
[input, output, err] = os.popen3(stopHadoop_comm + hadoop_config)
for line in output:
    print line
for line in err:
    print line


# shut down hadoop/hdfs
stophdfs_comm = "bin/stop-dfs.sh"
[input, output, err] = os.popen3(stophdfs_comm + hadoop_config)
for line in output:
    print line
for line in err:
    print line

core-site.xml:
   
        hadoop.tmp.dir
        /scratch/hadoop-yguan/tmp
   
   
        fs.default.name
        hdfs://compute-2-16.local
   

hdfs-site.xml:
   
        dfs.name.dir
        /scratch/yguan/hadoop/name
   
   
        dfs.data.dir
        /scratch/yguan/hadoop/data
   

There are some limitation in this setting. For example, we have the maximum of parallel running job on a machine set to constant. In fact we can set it to the number assigned by LSF. And this requires each tasknode has its own configuration file.

At this state, both components in Hadoop can be started and stopped under a job session.
Note that Platform who produces LSF already has its own Hadoop support.
Note that Hadoop has its own scheduler.

However, I find some error after try this script several times. When you format the namenode, datanode is not aware of it and since we don't change the path where datanode stores the data. There is will be error like this:
java.io.IOException: Incompatible namespaceIDs
The solution is to remove the path on datanode completely which is fine for our case or update namespaceID on datanode. Searching web for namespaceIDs will show this. To remove the directory, we can issue
ssh datanode_name rm -rf /path/to/datanode/storage
for every datanode.

Another error comes up is when we run the script, datanode is starting but namenode already starts to accept service request. So it will come up with "file could only be replicated to 0 nodes, instead of 1". The solution to this problem is wait for a few seconds before submit job. As there is no easy way to check HDFS health, especially datanode status by program right now. On opportunity.neu.edu for example, in rush hour the wait time can be 30 seconds long. (Update: when I'm using 0.20, this error occur sometimes even I have time delayed long enough to wait for the datanode. After upgrade to a new version, this problem seems go away.)

There are many places we can continue our studying about Hadoop. We can make us familiar with HDFS for example from the information at:
http://hadoop.apache.org/common/docs/r0.20.0/hdfs_shell.html

No comments: