Twister Kmeans User Guide
 

SALSA Group
PTI Indiana University
June 17th 2010

Contents

  1. Introduction
  2. Execution Guide for Twister-Kmeans
    1. Prerequisites
    2. Related Files
    3. Distributing Data
    4. Creating Partition File
    5. Executing Twister-Kmeans
    6. Summary of Steps
  3. Development Guide for Twister-Kmeans
    1. Prerequisites
    2. Related Files
    3. Code Analysis
      1. Implementing the kmeans clustering
      2. Implementing the map task segment

1. Introduction

In statistics and machine learning, kmeans clustering is a method of cluster analysis which aims to partition n observation into k clusters where each observation belongs to the cluster with the nearest mean [ wikipedia ].

In each iteration of Twister-Kmeans, all the map tasks get the same input data (current cluster centers)  and each computes a partial cluster centers by going through the 3D data points owned by itself. A reduce task computes the average of the partial cluster centers and produce the cluster centers for the next step. Main program, once it gets these new cluster centers, calculates the difference between the new cluster centers and the previous cluster centers, then determine if it needs to execute another cycle of MapReduce computation.
 

Figure 1. The workflow of kmeans application with Twister MapReduce framework

2. Execution Guide for Twister-Kmeans

  1. Prerequisite

    Make sure you have completed following steps before proceeding with the execution of the Twister-Kmeans application.

    • If you are on FutureGrid machines then just start NaradaBrokering and Twister as given below.
      cd $NBHOME/bin 
      ./startbr.sh
      cd $TWISTER_HOME/bin 
      ./start_twister.sh

      If NOT then setup Twister and NaradaBrokering. SeeTwister Installation for details.
    • If Twister-Kmeans-${Release}.jar is not already present in $TWISTER_HOME/apps/ directory then copy it as follows.
      cp $TWISTER_HOME/samples/kmeans/dist/lib/Twister-Kmeans-*.jar $TWISTER_HOME/apps

  2. Input

    The set of input files for this application are located under $TWISTER_HOME/samples/kmeans/input . This contains 8 files containing a total of 80000 three-dimensional data points.


  3. Distributing Data

    Next step is to distribute the partitioned data files to every compute node uniformly. You may do this manually if you prefer. The following steps to distribute files, however, are based on the convenient built-in functions of $TWISTER_HOME/bin/twister.sh .

    • Use the following template to create a directory relative to Twister's common data directory.
      twister.sh mkdir directory_name
      Parameter Description
      directory_name The directory name, which will be created relative to Twister's common data directory. The script creates this directory in each compute node, which is used by Twister to maintain data for the particular application. Each Map task will thus access data for its execution from this directory local to the machine on which it runs.

      You can use the following example command to create a default directory.

      cd $TWISTER_HOME/bin 
      ./twister.sh mkdir kmeans
      Note. Twister's data directory is the one specified in $TWISTER_HOME/bin/twister.properties under data_dir property. See Twister Installation for details.
    • Use the following template to distribute partitioned data files to the directory that you created in the previous step. If you used the example command above it will be the kmeans directory inside Twister's common data directory.
      twister.sh put split_directory destination_directory_name
      Parameter Description
      split_directory The qualified name of the directory containing partitioned data files
      destination_directory_name The name of the directory relative to Twister's data directory, which stores data for the particular application.

      You can use the following example command to distribute partitioned data to the default directory

      cd $TWISTER_HOME/bin 
      ./twister.sh put $TWISTER_HOME/samples/kmeans/input kmeans

  4. Creating Partition File

    The partition file is similar to partition file in Dryad/DryadLINQ. It maintains the information of input data distribution on compute nodes. Each line in the partition file records the network location of each input file in the format of hostname,local_path,filename . It guides Twister to schedule Map tasks in compute nodes where the input data are located. To create the partition file use create_partition_file.sh script in $TWISTER_HOME/bin directory as in the following template.

    create_partition_file.sh directory_name file_prefix partition_file
    Parameter Description
    directory_name The name of the directory relative to Twister's data directory, which stores data for the particular application.
    file_prefix The prefix pattern of the partitioned data file names
    partition_file The qualified name of the file to write the partition information

    You can use the following example command to create the default partition file for this application

    cd $TWISTER_HOME/bin 
    ./create_partition_file.sh kmeans kmeans_ $TWISTER_HOME/samples/kmeans/bin/kmeans.pf

  5. Executing Twister-Kmeans

    You are now ready to execute the Twister-Kmeans application. Use run_kmeans.sh script inside $TWISTER_HOME/samples/kmeans/bin as in the following template.

    run_kmeans.sh init_clusters_file num_map_tasks partition_file
    Parameter Description
    init_clusters_file The file that stores the initial centroid points for Twister-Kmeans application
    num_map_tasks The desired number of map tasks. Ths is equal to the number of partitioned data files
    partition_file The qualified name of the partition file

    You can use the following example command to run Twister-Kmeans application with default values

    cd $TWISTER_HOME/samples/kmeans/bin 
    ./run_kmeans.sh $TWISTER_HOME/samples/kmeans/bin/init_clusters.txt 8 $TWISTER_HOME/samples/kmeans/bin/kmeans.pf

    New centroid points will be printed out on the screen once the execution is completed.


  6. Summary of Steps
    • Create kmeans input data directory:
      cd $TWISTER_HOME/bin 
      ./twister.sh mkdir kmeans
    • Distribute partitioned input files:
      cd $TWISTER_HOME/bin 
      ./twister.sh put $TWISTER_HOME/samples/kmeans/input kmeans
    • Create partition file:
      cd $TWISTER_HOME/bin 
      ./create_partition_file.sh kmeans kmeans_ $TWISTER_HOME/samples/kmeans/bin/kmeans.pf
    • Execute Twister-Kmeans application:
      cd $TWISTER_HOME/samples/kmeans/bin 
      ./run_kmeans.sh $TWISTER_HOME/samples/kmeans/bin/init_clusters.txt 8 $TWISTER_HOME/samples/kmeans/bin/kmeans.pf

3. Development Guide for Twister-Kmeans

The following sections illustrate how to write kmeans application with Twister MapReduce framework. The kmeans application is little more complex than the Word Count application. It requires multiple iterations during MapReduce computation. Also, it employs both constant data and variable data during the iterations. Here we describe how to write iterative MapReduce application with Twister API and configure static/dynamic data in Twister.

Each map task in the Twister-Kmeans application gets a set of 3D data points. These data points does not change over the course of iterations and Map tasks access them in each iteration. Invariant nature of this data mark them as static in Twister. Thus, particular points are loaded only once for the entire execution. The cluster centers computed in each iteration are dynamic data as they tend to change over iterations. Also, the calculated cluster centers of a particular iteration serve as the input for the next iteration if they don't meet the satisfiable conditions.

  1. Prerequisites
    • Experience with Java or similar Object Oriented Programming language.
    • Knowledge on MapReduce programming model.

  2. Related Code Files

    The source code for Twister-Kmeans is located in $TWISTER_HOME/samples/kmeans/src . The following source files are analysed in the latter sections.

    • cgl.imr.samples.kmeans.KMeansClustering.java
    • cgl.imr.samples.kmeans.KMeansMapTask.java
  3. Code Analysis
    1. Implementing the kmeans clustering (cgl.imr.samples.kmeans.KMeansClustering.java)
                      jobConf jobConf = new JobConf("kmeans-map-reduce");
                      jobConf.setMapperClass(KMeansMapTask.class);
                      jobConf.setReducerClass(KMeansReduceTask.class);
                      jobConf.setCombinerClass(KMeansCombiner.class);
                      jobConf.setNumMapTasks(numMapTasks);
                      jobConf.setNumReduceTasks(numReducers);
      
                      TwisterDriver driver = new TwisterDriver(jobConf);
                      driver.configureMaps(partitionFile);
      
                      double totalError = 0;
                      int loopCount = 0;
                      TwisterMonitor monitor = null;
      
                      boolean complete = false;
                      for (loopCount = 0; loopCount < MAX_LOOPS; loopCount++) {
                          monitor = driver.runMapReduceBCast(cData);
                          monitor.monitorTillCompletion(); 
                          DoubleVectorData newCData = ((KMeansCombiner) driver.getCurrentCombiner()).getResults();
                          totalError = getError(cData, newCData);
                          cData = newCData;
                          if (totalError < THRESHOLD) {
                              complete = true;
                              break;
                          }
                      } //for
                      
      Line 1-6:
      Configures JobConf object for MapReduce computation.
      Line 8:
      Declares a TwisterDriver object.
      Line 9:
      Line 9 assigns the partition file that is used to configure the static data for the map tasks.
      Line 16 and 22:
      Define the test condition in the for loop. The kmeans application conducts iterative MapReduce computation till the test condition is true.
      Line 17:
      uses the driver to run MapReduce computation. The DoubleVectorData object cData is the new centroid data points, which are required to send to all the map tasks in each iteration. It is the dynamic data that we mentioned previously. Here we use the runMapReduceBCast() API to broadcast the dynamic data across computes nodes. Currently, Twister provides three methods to sponsor the MapReduce computation.
      1. runMapReduceBCast(Value val)
      2. runMapReduce(List<KeyValuePair> pairs)
        It divides the key vale pairs into several groups so that each Map task can handle a sub group of key values pairs. You can use this
      3. runMapReduce()
        This method does not scatter any variable data to map tasks. The map tasks may only handle the static data read from local disk. We have used this method in Twister-Kmeans
    2. Implementing the map task segment (cgl.imr.samples.kmeans.KMeansMapTask.java)
                          public void configure(JobConf jobConf, MapperConf mapConf) throws TwisterException {
                              this.vectorData = new DoubleVectorData();
                              this.fileData = (FileData) mapConf.getDataPartition();
                              try {
                                  vectorData.loadDataFromTextFile(fileData.getFileName());
                              } catch (Exception e) {
                                  throw new TwisterException(e); 
                              }
                          } //configure
                      
      Line 2:
      Declares the DoubleVectorData object vectorData, which is used to store the static data.
      Line 3:
      The fileData object obtains the information of one partitioned file. The map task can access that file from the local disk.
      Line 5:
      The vectorData object loads the values from the partitioned file which belongs to this map task.

      Note. The programmer need to implement the serialization support for the data types that are used to transfer data between compute nodes. The DoubleVectorData class is a pre-defined serializable data type in Twister. If a custom data type is necessary, the programmer can implement the cgl.imr.base.Value interface in Twister.