Hands-on Exercise 4 : Programming the Hadoop-Blast

Content

  1. Overview
  2. Blast input files handler (DataFileInputFormat & FileRecordReader)
  3. DataFileInputFormat class
  4. FileRecordReader class
  5. Write a Mapper
  6. Setup the Main Program
  7. Entire Scope of the Hadoop-Blast Program

1. Overview

Similar to the Hadoop-WordCount program, we need to implement the following part for the blast program:

    1. Blast input files handler
    2. Mapper
    3. Main program

In this case, as the nature of Blast program doesn’t require a Reducer to handle the intermediate map output, we do not implement the Reducer.

2. Blast input files handler (DataFileInputFormat & FileRecordReader)


The input of a standalone Blast program is a single FASTA file, meanwhile, in our case, the input format of the Hadoop-Blast program is a set of FASTA files. We define two java classes, DataFileInputFormat and FileRecordReader to collect those uploaded input files from HDFS to create key-value pairs for the Mapper.

3. DataFileInputFormat class

public class DataFileInputFormat extends FileInputFormat<String, String> {
      @Override
      public RecordReader<String, String> createRecordReader(
      InputSplit split, TaskAttemptContext context)
      throws IOException, InterruptedException {
      return new FileRecordReader();
      }
}

4. FileRecordReader class

public class FileRecordReader extends RecordReader<String, String> {
      private Path path;
      private FileSystem fs;
      private boolean done = false;      
      
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
        InterruptedException {
            path = ((FileSplit) split).getPath();
            fs = path.getFileSystem(context.getConfiguration());
        }      
        
        @Override
        public float getProgress() throws IOException {
            System.out.println("in getProgress : " + done);
            if (done) {
                return 1.0f;
            } else {
                return 0.0f;
            }
        }     
        
        @Override
        public String getCurrentKey() throws IOException, InterruptedException {
            System.out.println("in current key " + path.toString() + " :" + done);
            String pathName = path.getName();
            int index = pathName.lastIndexOf("/");
            return pathName.substring(index + 1, pathName.length());
        }   
        
        @Override
        public String getCurrentValue() throws IOException, InterruptedException {
            System.out.println(" get Current Value " + path.toString() + " :" + done);
            return path.toString();
        }   
        
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            System.out.println("next keyvalue : " + path.toString() + " :" + done);
            if (done) {
        			return false;
        	   } else {
        			done = true;
        			return true;
        }
        }
        
        @Override
        public void close() throws IOException {
            done = true;
        }
} // end of FileRecordReader

5. Write a Mapper

The Mapper mainly creates a java process to call the Blast program. The Map key is the filename, and the Map value includes the full HDFS path for each uploaded input files. Each map task downloads the assigned input file from HDFS, and passes this input to run the Blast program.

At the beginning of the job, each worker gets the cached Blast program and db from HDFS by overriding the setup function.

public class RunnerMap extends Mapper<String, String, IntWritable, Text> {
      private String localDB = "";
      private String localBlastProgram = "";

      @Override
      public void setup(Context context) throws IOException{
		    	Configuration conf = context.getConfiguration();
Path[] local = DistributedCache.getLocalCacheArchives(conf);
this.localDB = local[0].toUri().getPath() + File.separator + conf.get(DataAnalysis.DB_ARCHIVE) + File.separator + conf.get(DataAnalysis.DB_NAME);
this.localBlastProgram = local[0].toUri().getPath(); ... // other parts ... } // end of RunnerMap

Notes for code above

Then, we call the Blast program with the related database and input file.

public void map(String key, String value, Context context) throws IOException,
      InterruptedException {
      ...
      // some parameters setting
      ...
      // download the file from HDFS
Path inputFilePath = new Path(value); FileSystem fs = inputFilePath.getFileSystem(conf); fs.copyToLocalFile(inputFilePath, new Path(localInputFile));// Prepare the arguments to the executable String execCommand = cmdArgs.replaceAll("#_INPUTFILE_#", localInputFile); if (cmdArgs.indexOf("#_OUTPUTFILE_#") > -1) { execCommand = execCommand.replaceAll("#_OUTPUTFILE_#", outFile); } else { outFile = stdOutFile; } execCommand = this.localBlastProgram + File.separator + execName + " " + execCommand + " -db " + this.localDB;
//Create the external process Process p = Runtime.getRuntime().exec(execCommand); ... p.waitFor(); //Upload the results to HDFS
Path outputDirPath = new Path(outputDir); Path outputFileName = new Path(outputDirPath,fileNameOnly); fs.copyFromLocalFile(new Path(outFile),outputFileName); ... } // end of overriding the map

Notes for code above

6. Setup the Main Program

The most important part of the main program is to set the cached archive and the input format:

      // other parts
      ...
      Configuration conf = new Configuration();
      Job job = new Job(conf, execName);
      // Starting the data analysis.
      Configuration jc = job.getConfiguration();
      ...
      // other parts
      ...
      // using distributed cache
      DistributedCache.addCacheArchive(new URI(BlastProgramAndDB), jc);
      ...
      // other parts
      ...
      // input and output format
      job.setInputFormatClass(DataFileInputFormat.class);
      job.setOutputFormatClass(SequenceFileOutputFormat.class);
      job.setNumReduceTasks(numReduceTasks);

Notes for code above

Prev: Exercise 3: Running Hadoop-Blast in Distributed Hadoop

back to Hadoop tutorials