Using MapReduce in Cache
Note
This feature is only available in the NCache Enterprise Edition.
In order to use the MapReduce, initialize the MapReduce Task. Set the mapper, combiner and reducer factory to the task and then execute task on the cache.
Prerequisites
- To learn about the standard prerequisites required to work with all NCache server-side features please refer to the given page on Server Side API Prerequisites.
- For API details refer to: ICache, MapReduceTask, Combiner, Mapper, Reducer, ITrackableTask, GetResult, ITaskResult, IExecutionService, ExecuteTask, GetEnumerator.
The following example shows the sample usage of the MapReduce.
try
{
// Pre-condition: Cache is already connected
// Initialize MapReduce Task
MapReduceTask task = new MapReduceTask();
// Set Mapper, Combiner factory and Reducer factory
task.Mapper = new ProductCountMapper();
task.Combiner = new ProductCountCombinerFactory();
task.Reducer = new ProductCountReducerFactory();
// Execute task on cache
ITrackableTask wordCount = cache.ExecutionService.ExecuteTask(task, new ProductCountKeyFilter());
// Get result
ITaskResult result = wordCount.GetResult();
IDictionaryEnumerator enumResult = result.GetEnumerator();
while (enumResult.MoveNext())
{
// Perform operations
}
}
catch(OperationFailedException ex)
{
if (ex.ErrorCode == NCacheErrorCodes.TASK_IN_SUBMISSION_FAILED_NODE)
{
// Task is failed during submission
// Make sure that the mapper is defined
}
else if (ex.ErrorCode == NCacheErrorCodes.TASK_STARTING_FAILED)
{
// Task is failed while starting
// Make sure that the mapper is defined
}
else if (ex.ErrorCode == NCacheErrorCodes.TASK_CANT_BE_SUBMITTED)
{
// The limit of tasks to be submitted has been reached
// No more task can be submitted
}
else if (ex.ErrorCode == NCacheErrorCodes.TASK_CANCELLED_REASON)
{
// Task was cacelled and could not be submitted
}
else if (ex.ErrorCode == NCacheErrorCodes.TASK_SUBMISSION_FAILED)
{
// Task has failed to be submitted
}
else
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
}
catch (Exception ex)
{
// Any generic exception like ArgumentNullException or ArgumentException
}
See Also
Sample Implementation of MapReduce
Aggregator
Entry Processor
Configure MapReduce