MapReduce Aggregator Implementation and Usage [Deprecated]
The NCache Aggregator is a high-performance data processing component built on the MapReduce framework, designed to perform analytical tasks in parallel across a cache cluster. As a specialized MapReduce Task, the Aggregator is optimized for mathematical functions such as addition, averaging, and counting. By executing logic directly on the data nodes, it significantly decreases latency and network traffic compared to traditional client-side processing. This page provides step-by-step guide on implementing and using MapReduce Aggregator with NCache.
Prerequisites
Before using the NCache MapReduce Aggregator, ensure that the following prerequisites are fulfilled:
- Install either of the following NuGet packages in your application:
- Enterprise: Alachisoft.NCache.SDK
- OpenSource: Alachisoft.NCache.Opensource.SDK
- Include the following namespaces in your application based on the feature you intend to use:
- The cache must be running.
- Make sure that the data being added is serializable.
- All assemblies including dependent assemblies must be deployed.
- The class must be implemented as a Class Library (.dll) in Visual Studio. This will be deployed on NCache cluster.
- For API details, refer to: ICache, IAggregator, IValueExtractor, Aggregate, AggregateAll, Extract, IExecutionService.
The Aggregator and Value Extractor logic can be implemented in classes implementing the interfaces IAggregator and IValueExtractor respectively. These implementations will contain the logic to be executed over the cache item(s) on the server side. Once implemented, this implementation will be deployed on NCache. You can then invoke the cache from your client application to perform the specified Entry Processor logic over the server.
Step 1: Implement IAggregator Interface
Your custom logic is provided in Aggregate and AggregateAll methods in the IAggregator implementation. Aggregate() contains the logic of applying the aggregation operation on the same node (locally) as is with the Combiner. If you wish to combine the values using an aggregator before being sent for further processing in the Reducer, you can use the Aggregate() call.
AggregateAll() contains the logic of applying the aggregation operation in the Reduce Phase. If you wish to combine the values using an aggregator, you can use the AggregateAll() call. The following sample implementation processes give values accordingly and return the result where needed.
Important
Once implemented, deploy this class on NCache by referring to Deploy Providers in the Administrator’s Guide.
[Serializable]
public class IntAggregator : IAggregator
{
string function;
public IntAggregator(string function)
{
this.function = function;
}
public object Aggregate(object value)
{
return Calculate(value);
}
public object AggregateAll(object value)
{
return Calculate(value);
}
private object Calculate(object value)
{
switch (function)
{
case "MIN":
value = int.MinValue;
return value;
case "MAX":
value = int.MaxValue;
return value;
default:
return 0;
}
}
// This class is to be deployed on cache
}
Note
To ensure the operation is fail-safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Step 2: Implement IValueExtractor Interface
Your custom logic is provided in the Extract method, which will extract meaningful information/attributes from the object like the Mapper does in the MapReduce. The returned value may also be null. The following sample implementation processes give values accordingly and return the result where needed.
[Serializable]
public class BasicValueExtractor : IValueExtractor
{
public object Extract(object value)
{
if (value.GetType() == typeof(int))
{
return value;
}
if (value.GetType() == typeof(float))
{
return 0.0;
}
return value;
}
// This class is to be deployed in cache
}
Step 3: Deploy Implementations on the Cache
Deploy these custom implementations on NCache by referring to Deploy Providers in the Administrator’s Guide for help.
Step 4: Use Aggregator in the Application
Once the interfaces are implemented and deployed on the cache, you can execute the Aggregator using the Aggregate method in your client application.
The following code sample adds a bulk of items into the cache and then invokes the Aggregator using the keys added to the cache, for which implementation has been provided in IAggregator class.
try
{
// Precondition:Cache is already connected
// Data exists in cache
int value;
// Get single value by custom implemented aggregator
value = (int)cache.ExecutionService.Aggregate(new BasicValueExtractor(), new IntAggregator("MIN"));
// Get single value by custom implemented aggregator
value = (int)cache.ExecutionService.Aggregate(new BasicValueExtractor(), new IntAggregator("MAX"));
// Using the built-in aggregator
value = (int)cache.ExecutionService.Aggregate(new BasicValueExtractor(), BuiltInAggregator.IntegerSum());
}
catch (OperationFailedException ex)
{
// Exception can occur due to:
// Connection Failures
// Operation Timeout
// Operation performed during state transfer
}
catch (Exception ex)
{
// Any generic exception like ArgumentNullException or ArgumentException
}
See Also
Aggregator Components and Working
MapReduce
WAN Replication across Multi Datacenters through Bridge
Deploy Providers