User Guide

  1. Adding Monitoring Interface
  2. Configuring Storage Credentials
  3. Implementing the Mapper
  4. Implementing the Reducer
  5. Preparing the MapReduce worker role.
  6. Run/Debug/Deploy
  7. Client API

Create a new Cloud Project

  • Start Visual Studio 2010 pre-installed with Azure-SDK. (If you want to use the local fabric, run it as administrator). Currently MapReduceRoles for Azure supports version 1.3 of the Azure SDK.
  • Create a new Cloud project. (File-> New -> Project -> Visual C# -> Cloud -> Windows Azure Cloud Service).
  • In the “New Cloud Service” dialog box, add a Worker Role to the right pane (From “.NET Framework 4 Roles”  to “Cloud Service Solution”). Add a Web Role (optional) to the side pane for the Map Reduce monitoring interface.

Adding the Monitoring Interface

  • Right click the solution in the solution explorer -> Add -> Existing Project. Then browse to the AzureMRUI project that come with the “MapReduceRoles for Azure”  distribution.
  • In the CloudService project, right click the Roles->WebRole and then Associate With -> Web Role Project in Solution-> Select AzureMRUI.
  • Right click on the empty WebRole project in the Solution and  Remove it.

Configure Storage Credentials

  • Right click again on a Role unser “Roles” in the CloudService project and select properties. Go to “Settings” tab. Click “Add Setting”.  Name the Setting as “DataConnectionString”. Select “ConnectionString” as the Type. Click on “...” in the value and select “Use Development Storage”. You can also enter your storage credentials, if you want to use a real Azure Storage Account or when you are ready to deploy your application to the Azure Cloud.
  • Add the same setting for for both Worker Role and the Web Role.

Implementing the Mapper

  • Add a new c# class to the WorkerRole project. Import "AzureMRCore" and "AzureMRCore.DataTypes" to the new class. Extend the new class from "MapWorker<INKEY, INVALUE, OUTKEY, OUTVALUE>" and use the appropriate types for the generic type parameters.
  • Implement the Map method.
  • protected override int Map(IntKey key, StringValue value, string programArgs, IOutputCollector<StringKey, IntValue> outputCollector){....}

Eg: WordCount Mapper

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using AzureMRCore;
using AzureMRCore.DataTypes;

namespace AzureMRWorker
{
    class WordCountMapper : MapWorker
    {

        protected override int Map(IntKey key, StringValue value, string programArgs, IOutputCollector outputCollector)
        {
            string line = value.GetTextValue() ;
            string[] words = line.Split(' ');
            foreach (string word in words)
            {                
                outputCollector.Collect(StringKey.GetInstance(word), IntValue.GetInstance(1));
            }
            return 0;
        }
    }
}

Implementing the Reducer

  • Add a new c# class to the WorkerRole project. Import "AzureMRCore" and "AzureMRCore.DataTypes" to the new class. Extend the new class from "ReduceWorker<INKEY, INVALUE, OUTKEY, OUTVALUE>" and use the appropriate types for the generic parameters.
  • Make sure the INKEY and INVALUE types of the Reducer implementation are same as the OUTKEY and OUTVALUE type of the Mapper implementation.
  • Implement the Reduce method.
  • public override int Reduce(StringKey key, List<IntValue> values, string programArgs, IOutputCollector<StringKey, IntValue> outputCollector){....}

Eg: WordCount Reducer

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using AzureMRCore;
using AzureMRCore.DataTypes;

namespace AzureMRWorker
{
    class WordCountReducer: ReduceWorker
    {
        public override int Reduce(StringKey key, List values, string programArgs, IOutputCollector outputCollector)
        {
            int count = 0;
            foreach (IntValue value in values)
            {
                count += value.value;
            }
            IntValue outValue = new IntValue();
            outValue.value = count;
            outputCollector.Collect(key, outValue);
            return 0;
        }
    }
}

Preparing MapReduce Worker Role

  • Right click again on the WorkerRole under “Roles” in the CloudService project and select properties. Go to “Local Storage” tab. Add a local storage by clicking “Add Local Storage”. Name it “temp” and give an appropriate size based on your application and based on the VM size your are going to use.

  • Add AzureMrCore.dll as a reference to the WorkerRole.
    WorkerRole->References (right click) -> Add Reference -> Browse -> Select the AzureMRCore.dll (in the “lib” dir of the distribution)
  • Delete the auto generated WorkerRole.cs file under the WorkerRole project.
  • Add a new c# class to the project.
    WorkerRole project (right click) -> Add -> New Item -> Class
  • Import the AzureMRCore namespace. (Add  “using AzureMRCore; “ on top of the new class.)
  • Extend the new class from AzureMRWorkerRole class and override the “ConfigMapRed” method. Perform the necessary configurations inside that method.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using AzureMRCore;
using AzureMRCore.InputFormat;

namespace AzureMRWorker
{
    class WordCountMR : AzureMRWorkerRole
    {
        public override Boolean ConfigMapRed(MRConf mrConf)
        {
	   mrConf.mapperType = typeof(WordCountMapper);
            mrConf.reducerType = typeof(WordCountReducer);
            mrConf.combinerType = typeof(WordCountReducer);

            mrConf.MapWorkersPerInstance = 4;
            mrConf.ReduceWorkersPerInstance = 4;
            mrConf.mapTimeOut = 360;
            mrConf.reduceTimeOut = 7200;

            mrConf.mapSchedQueueName = "mapsched4";
            mrConf.reduceSchedQueueName = "reducesched4";
            mrConf.inputFormatType = typeof(LineInputFormat);            
            mrConf.partitionerType = typeof(HashPartitioner);
            return true;            
        }
    }
}				

Running/Debugging

  • Local : Assuming you ran Visual Studio 2010 as administrator (Required for deploying in the development fabric), simply Run/Debug the sample solution in Visual Studio. The web based monitoring console will open in a browser window.
  • Deploying in Azure Cloud
    Follow these tutorials from MSDN to deploy Azure Applications from Visual Studio.

Client API

NOTE: You need to make sure to provide the same queue names in your service deployment as well as in the client program.

  • Creating tasks using files in a BLOB container
AzureMRCore.Client.ClientUtils.processMapRed
	(string inputBlobContainer, string jobid, string programParams, int numReduceTasks, string outputContainer,
            string mapQueueName, string reduceQueueName, CloudStorageAccount storageAccount)

Program params can be used to pass an optional program parameter to all the Map and Reduce tasks."inputBlobContainerURI" should contain the files that needs to be processed.The output will be stored in the "outputContainerName".

  • Creating custom map tasks
AzureMRCore.Client.ClientUtils.processMapRed
	(List<string> mapTasks, string jobid, string programParams, int numReduceTasks, string outputContainerName,
            string mapQueueName, string reduceQueueName, CloudStorageAccount storageAccount)

Provide a list of string parameters corresponding to the map tasks. Map task will receive the value as both the Key as well as Value (We are working on to support different key, values).

  • Waiting for completion

After submitting the job using the above API, you can optionally wait for the completion of the job using the following method.

AzureMRCore.Client.ClientUtils.waitForCompletion(string jobid, CloudStorageAccount storageAccount, int sleepTime)

"sleepTime" is the polling interval that will be used to poll for the status of the job.

Eg: WordCount Client

CloudStorageAccount storageAccount = CloudStorageAccount.DevelopmentStorageAccount;
string jobID = "wctest1";
string inputContainer= "wcinput";
string outputContainer = "wcoutput";
int numReduceTasks = 2;
ClientUtils.processMapRed(inputContainer, jobID, "", numReduceTasks,outputContainer ,"mapsched4", "reducesched4", storageAccount);
ClientUtils.waitForCompletion(jobID, storageAccount, 500);