Hands-on Exercise 4 : Programming the Hadoop-Blast
Content
- Overview
- Blast input files handler (DataFileInputFormat & FileRecordReader)
- DataFileInputFormat class
- FileRecordReader class
- Write a Mapper
- Setup the Main Program
- Entire Scope of the Hadoop-Blast Program
1. Overview
Similar to the Hadoop-WordCount program, we need to implement the following part for the blast program:
- Blast input files handler
- Mapper
- Main program
In this case, as the nature of Blast program doesn’t require a Reducer to handle the intermediate map output, we do not implement the Reducer.
2. Blast input files handler (DataFileInputFormat & FileRecordReader)
The input of a standalone Blast program is a single FASTA file, meanwhile, in our case, the input format of the Hadoop-Blast program is a set of FASTA files. We define two java classes, DataFileInputFormat and FileRecordReader to collect those uploaded input files from HDFS to create key-value pairs for the Mapper.
3. DataFileInputFormat class
public class DataFileInputFormat extends FileInputFormat<String, String> {
@Override
public RecordReader<String, String> createRecordReader(
InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
return new FileRecordReader();
}
}
4. FileRecordReader class
public class FileRecordReader extends RecordReader<String, String> {
private Path path;
private FileSystem fs;
private boolean done = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
path = ((FileSplit) split).getPath();
fs = path.getFileSystem(context.getConfiguration());
}
@Override
public float getProgress() throws IOException {
System.out.println("in getProgress : " + done);
if (done) {
return 1.0f;
} else {
return 0.0f;
}
}
@Override
public String getCurrentKey() throws IOException, InterruptedException {
System.out.println("in current key " + path.toString() + " :" + done);
String pathName = path.getName();
int index = pathName.lastIndexOf("/");
return pathName.substring(index + 1, pathName.length());
}
@Override
public String getCurrentValue() throws IOException, InterruptedException {
System.out.println(" get Current Value " + path.toString() + " :" + done);
return path.toString();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
System.out.println("next keyvalue : " + path.toString() + " :" + done);
if (done) {
return false;
} else {
done = true;
return true;
}
}
@Override
public void close() throws IOException {
done = true;
}
} // end of FileRecordReader
5. Write a Mapper
The Mapper mainly creates a java process to call the Blast program. The Map key is the filename, and the Map value includes the full HDFS path for each uploaded input files. Each map task downloads the assigned input file from HDFS, and passes this input to run the Blast program.
At the beginning of the job, each worker gets the cached Blast program and db from HDFS by overriding the setup function.
public class RunnerMap extends Mapper<String, String, IntWritable, Text> {
private String localDB = "";
private String localBlastProgram = "";
@Override
public void setup(Context context) throws IOException{
Configuration conf = context.getConfiguration();
Path[] local = DistributedCache.getLocalCacheArchives(conf);
this.localDB = local[0].toUri().getPath() + File.separator + conf.get(DataAnalysis.DB_ARCHIVE) + File.separator + conf.get(DataAnalysis.DB_NAME);
this.localBlastProgram = local[0].toUri().getPath();
...
// other parts
...
} // end of RunnerMap
Notes for code above
- Line 8 gets the Path of downloaded and unzipped archive of Blast Program and Database from local disk.
- Line 9 sets the Path of Blast Database
- Line 10 sets the Path of Blast Program
Then, we call the Blast program with the related database and input file.
public void map(String key, String value, Context context) throws IOException,
InterruptedException {
...
// some parameters setting
...
// download the file from HDFS
Path inputFilePath = new Path(value);
FileSystem fs = inputFilePath.getFileSystem(conf);
fs.copyToLocalFile(inputFilePath, new Path(localInputFile));// Prepare the arguments to the executable
String execCommand = cmdArgs.replaceAll("#_INPUTFILE_#", localInputFile);
if (cmdArgs.indexOf("#_OUTPUTFILE_#") > -1) {
execCommand = execCommand.replaceAll("#_OUTPUTFILE_#", outFile);
} else {
outFile = stdOutFile;
}
execCommand = this.localBlastProgram + File.separator + execName + " " + execCommand + " -db " + this.localDB;
//Create the external process
Process p = Runtime.getRuntime().exec(execCommand);
...
p.waitFor();
//Upload the results to HDFS
Path outputDirPath = new Path(outputDir);
Path outputFileName = new Path(outputDirPath,fileNameOnly);
fs.copyFromLocalFile(new Path(outFile),outputFileName);
...
} // end of overriding the map
Notes for code above
- Line 10 gets the (Blast) input from HDFS, and sets this Path to "localInputFile"
- Line 19 prepares the command line (Blast executable normal command) will be involved by Line 22
- Line 22 creates an external process for the (Blast) executable
- Line 29 copies the (Blast) output back to HDFS.
6. Setup the Main Program
The most important part of the main program is to set the cached archive and the input format:
// other parts
...
Configuration conf = new Configuration();
Job job = new Job(conf, execName);
// Starting the data analysis.
Configuration jc = job.getConfiguration();
...
// other parts
...
// using distributed cache
DistributedCache.addCacheArchive(new URI(BlastProgramAndDB), jc);
...
// other parts
...
// input and output format
job.setInputFormatClass(DataFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(numReduceTasks);
Notes for code above
- Line 11 sets the BlastProgramAndDB as DistributedCache for this job
- Line 16 sets the InputFormatClass as our customized "DataFileInputFormat" class.
Prev: Exercise 3: Running Hadoop-Blast in Distributed Hadoop