run Hadoop in constrained environment
run Hadoop in shared environment with only a user account
run Hadoop with LSF
Opportunity.neu.edu cluster at Northeastern U doesn't have hadoop yet and I can see it will not have it in the near future. So I am wondering whether I can run hadoop as a job under Lava. The plan is like this:
Step 1: run hadoop in single mode as a job.
Step 2: run hadoop as a parallel job after initializing tasktracker on assigned node (MapReduce).
Step 3: run hadoop using full setting, add datanode on assigned node (HDFS).
Step 1: This goal is just simple enough. You just need to follow the get started http://hadoop.apache.org/common/docs/current/#Getting+Started
link and You should be OK. I try the pi example on the front node and it works.
Step2: I want to first try out MapReduce for several reasons. First, most algorithms I commonly use actually is compute-oriented; Second, it's not necessary for MapReduce to deal with HDFS, I can run it with local file system. If something runs wrong, it's easier to kill processes than to clean up files; Third, it should be easy if we can get HDFS running after we understand how to make MapReduce work.
After spending a day search on the Web and try out different configuration, I can run MapReduce under opportunity. First of all, following is the Python script I submit using bsub, configuration for Hadoop is also shown:
bash$ bsub -J hadoop_test -o /home/yguan/ExpJobs/Jobs/exp103/log/hadoop_test.log -n 12 /home/yguan/ExpJobs/Jobs/exp103/template/hadoop_test.py
hadoop_test.py:
#!/usr/bin/python
import os
import sys
templateDir = sys.path[0]
print(templateDir)
machineFileDir = os.path.join(templateDir, 'hadoop_conf/slaves')
configDir = os.path.join(templateDir, 'hadoop')
hosts = os.environ["LSB_HOSTS"]
hostList = hosts.split(' ')
hostList = list(set(hostList)) # remove duplicate host
# write to slaves files
fh = open(machineFileDir, 'w')
for host in hostList:
fh.write(host)
fh.write('\n')
fh.close()
# get the network address for the jobtracker.
import platform
nodeName = platform.node()
mapredFileName = os.path.join(templateDir, "hadoop_conf/mapred-site.xml")
mapredFileNameBack = os.path.join(templateDir, "hadoop_conf/mapred-site.xml.back")
fhr = open(mapredFileName, 'r')
fhw = open(mapredFileNameBack, 'w')
# here we assume the second line after mapred.job.tracker line
# is the value line.
mapredJobTrackerLine = False
for line in fhr:
if "mapred.job.tracker<" in line:
mapredJobTrackerLine = True
elif mapredJobTrackerLine == True:
mapredJobTrackerLine = False
fhw.write("
continue
fhw.write(line)
fhr.close()
fhw.close()
os.remove(mapredFileName)
os.rename(mapredFileNameBack, mapredFileName)
# start hadoop/MapReduce part
hadoop_config = " --config " + os.path.join(templateDir, 'hadoop_conf')
hadoop_home = os.environ["HADOOP_HOME"]
os.chdir(hadoop_home)
startHadoop_comm = "bin/start-mapred.sh "
[input, output, err] = os.popen3(startHadoop_comm + hadoop_config)
for line in output:
print line
for line in err:
print line
# run the hadoop job
hadoop_command = "hadoop "
hadoop_run = " jar hadoop-0.20.2-examples.jar pi 100 100"
#command = "bin/hadoop jar hadoop-0.20.2-examples.jar pi 2 100"
command = hadoop_command + hadoop_config + hadoop_run
print(command)
[input, output, err] = os.popen3(command)
for line in output:
print line
for line in err:
print line
# shut down hadoop/mapreduce
stopHadoop_comm = "bin/stop-mapred.sh"
[input, output, err] = os.popen3(stopHadoop_comm + hadoop_config)
for line in output:
print line
for line in err:
print line
hadoop_conf/core-site.xml:
hadoop_conf/mapred-site.xml:
Step 3: Now it's time to make MapReduce and HDFS works. One thing special about HDFS other than MapReduce is that you need to format the file system first. When you type in shell command "bin/hadoop namenode -format", it will ask you for a "Y" to confirm. Thus I play a dirty trick here to have a file containing a "Y" and a newline and serve as the input for the format shell command. So here we have a new version of hadoop_test.py:
#!/usr/bin/python
import os
import sys
templateDir = sys.path[0]
print(templateDir)
machineFileDir = os.path.join(templateDir, 'hadoop_conf/slaves')
configDir = os.path.join(templateDir, 'hadoop')
hosts = os.environ["LSB_HOSTS"]
hostList = hosts.split(' ')
hostList = list(set(hostList)) # remove duplicate host
# write to slaves files
fh = open(machineFileDir, 'w')
for host in hostList:
fh.write(host)
fh.write('\n')
fh.close()
# get the network address for the jobtracker.
import platform
nodeName = platform.node()
mapredFileName = os.path.join(templateDir, "hadoop_conf/mapred-site.xml")
mapredFileNameBack = os.path.join(templateDir, "hadoop_conf/mapred-site.xml.back")
fhr = open(mapredFileName, 'r')
fhw = open(mapredFileNameBack, 'w')
# here we assume the second line after mapred.job.tracker line
# is the value line.
mapredJobTrackerLine = False
for line in fhr:
if "mapred.job.tracker<" in line:
mapredJobTrackerLine = True
elif mapredJobTrackerLine == True:
mapredJobTrackerLine = False
fhw.write("
continue
fhw.write(line)
fhr.close()
fhw.close()
os.remove(mapredFileName)
os.rename(mapredFileNameBack, mapredFileName)
# get the network address for the namenode
hdfsFileName = os.path.join(templateDir, "hadoop_conf/core-site.xml")
hdfsFileNameBack = os.path.join(templateDir, "hadoop_conf/core-site.xml.back")
fhr = open(hdfsFileName, 'r')
fhw = open(hdfsFileNameBack, 'w')
hdfsLine = False
for line in fhr:
if "fs.default.name<" in line:
hdfsLine = True
elif hdfsLine == True:
hdfsLine = False
fhw.write("
continue
fhw.write(line)
fhr.close()
fhw.close()
os.remove(hdfsFileName)
os.rename(hdfsFileNameBack, hdfsFileName)
# format the newly created hdfs
hadoop_config = " --config " + os.path.join(templateDir, 'hadoop_conf')
hadoop_home = os.environ["HADOOP_HOME"]
os.chdir(hadoop_home)
format_comm = "bin/hadoop " + hadoop_config + " namenode -format < Y"
[input, output, err] = os.popen3(format_comm)
for line in output:
print line
for line in err:
print line
# start hadoop/HDFS part
starthdfs_comm = "bin/start-dfs.sh "
[input, output, err] = os.popen3(starthdfs_comm + hadoop_config)
for line in output:
print line
for line in err:
print line
# start hadoop/MapReduce part
startHadoop_comm = "bin/start-mapred.sh "
[input, output, err] = os.popen3(startHadoop_comm + hadoop_config)
for line in output:
print line
for line in err:
print line
# run the hadoop job
hadoop_command = "hadoop "
hadoop_run = " jar hadoop-0.20.2-examples.jar pi 100 100"
#command = "bin/hadoop jar hadoop-0.20.2-examples.jar pi 2 100"
command = hadoop_command + hadoop_config + hadoop_run
print(command)
[input, output, err] = os.popen3(command)
for line in output:
print line
for line in err:
print line
# shut down hadoop/mapreduce
stopHadoop_comm = "bin/stop-mapred.sh"
[input, output, err] = os.popen3(stopHadoop_comm + hadoop_config)
for line in output:
print line
for line in err:
print line
# shut down hadoop/hdfs
stophdfs_comm = "bin/stop-dfs.sh"
[input, output, err] = os.popen3(stophdfs_comm + hadoop_config)
for line in output:
print line
for line in err:
print line
core-site.xml:
hdfs-site.xml:
At this state, both components in Hadoop can be started and stopped under a job session.
Note that Platform who produces LSF already has its own Hadoop support.
Note that Hadoop has its own scheduler.
However, I find some error after try this script several times. When you format the namenode, datanode is not aware of it and since we don't change the path where datanode stores the data. There is will be error like this:
java.io.IOException: Incompatible namespaceIDs
The solution is to remove the path on datanode completely which is fine for our case or update namespaceID on datanode. Searching web for namespaceIDs will show this. To remove the directory, we can issue
ssh datanode_name rm -rf /path/to/datanode/storage
for every datanode.
Another error comes up is when we run the script, datanode is starting but namenode already starts to accept service request. So it will come up with "file could only be replicated to 0 nodes, instead of 1". The solution to this problem is wait for a few seconds before submit job. As there is no easy way to check HDFS health, especially datanode status by program right now. On opportunity.neu.edu for example, in rush hour the wait time can be 30 seconds long. (Update: when I'm using 0.20, this error occur sometimes even I have time delayed long enough to wait for the datanode. After upgrade to a new version, this problem seems go away.)
There are many places we can continue our studying about Hadoop. We can make us familiar with HDFS for example from the information at:
http://hadoop.apache.org/common/docs/r0.20.0/hdfs_shell.html
No comments:
Post a Comment