Queue Behavior and Usage in Cache
Note
This feature is only available in the NCache Enterprise.
A queue is an ordered data structure that uses the principle of First In First Out (FIFO) behavior. You can use queues to maintain information being processed at runtime. For example, the application IDs of potential candidates for a company can be stored in a queue. As the queue is FIFO in nature, the first candidate to apply will be interviewed first, and so on.
NCache further enhances queue data structure by providing NCache-specific features such as Groups, Tags, Expiration, Locking, Dependencies, and more. In this scenario, the company can specify a group against a queue of people who applied for administrative jobs.
Behavior
- A queue can be of any primitive type or custom object.
- A queue of
CacheItem
and nested queues are not yet supported.
- Queues are named. Hence, you need to provide a unique cache key for a queue.
- Null is not a supported value type.
- Duplicate values are supported.
Prerequisites
- To learn about the standard prerequisites required to work with all NCache client-side features please refer to the given page on Client Side API Prerequisites.
- For API details, refer to: ICache, IDistributedQueue, IDataTypeManager, Contains, CopyTo, ToArray, Peek, GetQueue, Dequeue, Enqueue, CreateQueue, RegisterNotification, DataTypeDataNotificationCallback, EventType, DataTypeEventDataFilter, Lock, Unlock.
- To learn about the standard prerequisites required to work with all NCache client-side features please refer to the given page on Client Side API Prerequisites.
- For API details, refer to: Cache, DistributedQueue, createQueue, getQueue, contains, lock, unlock, EventType, getEventType, addChangeListener, getDataStructuresManager, onDataStructureChanged, DataTypeEventDataFilter, DataStructureDataChangeListener, DataStructureEventArg, getCollectionItem.
- To learn about the standard prerequisites required to work with all NCache client-side features please refer to the given page on Client Side API Prerequisites.
- For API details, refer to: Cache, DistributedQueue, getDataStructuresManager, createQueue, add, getQueue, contains, copyTo, toArray, peek, remove, EventType, DataStructureDataChangeListener, addChangeListener, DataTypeEventDataFilter, DataStructureEventArg, getEventType, getCollectionItem.
- To learn about the standard prerequisites required to work with all NCache client-side features please refer to the given page on Client Side API Prerequisites.
- For API details refer to: Cache, DataStructureManager, get_data_structures_manager, DistributedQueue, contains, add, get_iterator, remove, add_change_listener, create_queue, get_queue, get_event_type, DataTypeEventDataFilter, EventDataFilter, DataStructureDataChangeListener, onDataStructureChanged.
Create Queue and Add Data
The following code sample shows how a queue of Candidate type can be created in the cache against the cache key CandidateQueue and how items are added to the queue.
// Precondition: Cache must be connected
// Specify unique cache key for queue
string key = "CandidateQueue";
// Create Queue of Candidate type
IDistributedQueue<Candidate> queue = cache.DataTypeManager.CreateQueue<Candidate>(key);
Candidate[] candidates = FetchCandidates();
foreach(var candidate in candidates)
{
// Add candidates to queue
queue.Enqueue(candidate);
}
// Precondition: Cache must be connected
// Specify unique cache key for queue
String key = "CandidateQueue";
// Create Queue of Candidate type
DistributedQueue<Candidate> queue = cache.getDataStructuresManager().createQueue(key, Candidate.class);
Candidate[] candidates = fetchCandidates();
for (var candidate : candidates) {
// Add candidates to queue
queue.add(candidate);
}
// This is an async method
// Precondition: Cache must be connected
// Specify unique cache key for queue
var key = "CandidateQueue";
// Create Queue
var manager = await this.cache.getDataStructuresManager();
var queue = await manager.createQueue(key, ncache.JsonDataType.Object);
var candidates = this.fetchCandidates();
for (var candidate in candidates) {
// Add candidates to queue
queue.add(candidate);
}
# Precondition: Cache must be connected
# Specify unique cache key for queue
key = "CandidateQueue"
# Create Queue
manager = cache.get_data_structures_manager()
queue = manager.create_queue(key, Candidate)
candidates = fetch_candidates()
for candidate in candidates:
# Add candidates to queue
queue.add(candidate)
# Check if a candidate exists against given ID
candidate = get_candidate_by_id(1002)
if queue.contains(candidate):
# Candidate exists with this ID
print("Candidate found")
The following code sample fetches a queue from the cache using GetQueue
and then checks whether an item exists in the queue or not using Contains
. How to get an item from the beginning of the queue without removing it using Peek
. To copy the entire source queue to the target array using CopyTo
, and to copy this queue into a one-dimensional array using ToArray
.
// Queue with this key already exists in cache
string key = "CandidateQueue";
// Get queue and show items of queue
IDistributedQueue<Candidate> retrievedQueue = cache.DataTypeManager.GetQueue<Candidate>(key);
// Get any candidate ID
Candidate candidate = GetCandidateByID(1002);
// Check whether queue contains the a particular candidate or not
if(retrievedQueue.Contains(candidate))
{
// Candidate exists with this ID
}
// List of candidate's for the queue to be copied into
Candidate[] candidates = new Candidate[retrievedQueue.Count];
// Copy the queue to another array
retrievedQueue.CopyTo(candidates, 0);
// Copy queue to one-dimensional array
candidates = retrievedQueue.ToArray();
// Get an item at beginning from queue without removing
Candidate firstCandidate = retrievedQueue.Peek();
// Precondition: Cache is already connected
// Queue with this key already exists in cache
String key = "CandidateQueue";
// Get queue and show items of queue
DistributedQueue<Candidate> retrievedQueue = cache.getDataStructuresManager().getQueue(key, Candidate.class);
// Get any candidate's ID
Candidate candidate = getCandidateByID(1002);
// Check whether queue contains the a particular candidate or not
if(retrievedQueue.contains(candidate))
{
// Candidate exists with this ID
}
// List of candidates for the queue to be copied into
Candidate[] candidates = new Candidate[retrievedQueue.size()];
// Copy the queue to another array
retrievedQueue.copyTo(candidates, 0);
// Copy queue to one-dimensional array
candidates = retrievedQueue.toArray();
// Get an item at beginning from queue without removing
Candidate firstCandidate = retrievedQueue.peek();
// This is an async method
// Precondition: Cache is already connected
// Queue with this key already exists in cache
var key = "CandidateQueue";
// Get queue and show items of queue
var manager = await this.cache.getDataStructuresManager();
var retrievedQueue = await manager.getQueue(key, ncache.JsonDataType.Object);
// Get any candidate's ID
var candidate = this.getCandidateByID(1002);
// Check whether queue contains the a particular candidate or not
if(retrievedQueue.contains(candidate))
{
// Candidate exists with this ID
}
// List of candidates for the queue to be copied into
var candidates = new Candidate[retrievedQueue.length];
// Copy the queue to another array
retrievedQueue.copyTo(candidates, 0);
// Copy queue to one-dimensional array
var candidates = retrievedQueue.toArray();
// Get an item at beginning from queue without removing
var firstCandidate = retrievedQueue.peek();
# Precondition: Cache is already connected
# Queue with this key already exists in cache
key = "CandidateQueue"
# Get queue and show items of queue
manager = cache.get_data_structures_manager()
retrieved_queue = manager.get_queue(key, Candidate)
if retrieved_queue is not None:
for item in retrieved_queue.get_iterator():
# Perform operations
print(item)
else:
# Queue does not exist
print("Queue not found")
Remove Items from Queue
Items can be removed from a queue in a FIFO manner. The following code sample removes the first candidate from the queue.
// Queue with this key already exists in cache
string key = "CandidateQueue";
IDistributedQueue<Candidate> retrievedQueue = cache.DataTypeManager.GetQueue<Candidate>(key);
// Remove first item of queue
retrievedQueue.Dequeue();
// Precondition: Cache must be connected
// Queue with this key already exists in cache
String key = "CandidateQueue";
DistributedQueue<Candidate> retrievedQueue = cache.getDataStructuresManager().getQueue(key, Candidate.class);
// Remove first item of queue
retrievedQueue.remove();
// This is an async method
// Precondition: Cache must be connected
// Queue with this key already exists in cache
var key = "CandidateQueue";
var manager = await this.cache.getDataStructuresManager();
var retrievedQueue = await manager.getQueue(key, ncache.JsonDataType.Object);
// Remove first item of queue
await retrievedQueue.remove();
# Precondition: Cache must be connected
# Queue with this key already exists in cache
key = "CandidateQueue"
manager = cache.get_data_structures_manager()
retrieved_queue = manager.get_queue(key, Candidate)
# Remove first item of queue
retrieved_queue.remove()
Note
To ensure the operation is fail-safe, it is recommended to handle any potential exceptions within your application, as explained in Handling Failures.
Event Notifications on Queues
You can register cache events, key-based events, and data structure events on a data structure such as a queue.
For behavior, refer to feature wise behavior.
The following code sample registers a cache event of ItemAdded
and ItemUpdated
as well as registers an event for ItemAdded
and ItemUpdated
on the queue in the cache. Once a queue is created in the cache, an ItemAdded
cache-level event is fired. However, once an item is added to the queue, an ItemAdded
data structure event is fired, and an ItemUpdated
cache level event is fired.
Register Event on Queue Created
// Unique cache key for queue
string key = "CandidateQueue";
// Create queue of Candidate type
IDistributedQueue<Candidate> queue = cache.DataTypeManager.CreateQueue<Candidate>(key);
// Register ItemAdded, ItemUpdated, ItemRemoved events on queue created
// DataTypeNotificationCallback is callback method specified
queue.RegisterNotification(DataTypeDataNotificationCallback, EventType.ItemAdded |
EventType.ItemUpdated | EventType.ItemRemoved,
DataTypeEventDataFilter.Data);
// Perform operations
// Precondition: Cache is already connected
// Unique cache key for the queue
String key = "CandidateQueue";
// Create queue of candidate type
DistributedQueue<Candidate> queue = cache.getDataStructuresManager().createQueue(key, Candidate.class);
// Create EnumSet of event types
EnumSet<EventType> enumSet = EnumSet.of(com.alachisoft.ncache.runtime.events.EventType.ItemAdded,
EventType.ItemUpdated, EventType.ItemRemoved);
// Register ItemAdded, ItemUpdated, ItemRemoved events on queue created
// dataChangeListener is the specified callback method
DataStructureDataChangeListener dataChangeListener = dataStructureListener.onDataStructureChanged(collectionName, args);
queue.addChangeListener(dataChangeListener, enumSet, DataTypeEventDataFilter.Data);
// Perform operations
// This is an async method
// Precondition: Cache is already connected
// Unique cache key for the queue
var key = "CandidateQueue";
// Create queue of candidate type
var queue = await this.cache
.getDataStructuresManager()
.createQueue(key, ncache.JsonDataType.Object);
// Create EnumSet of event types
var enumSet = enumSet.of(
ncache.EventType.ItemAdded,
ncache.EventType.ItemUpdated,
ncache.EventType.ItemRemoved
);
// Register ItemAdded, ItemUpdated, ItemRemoved events on queue created
// dataChangeListener is the specified callback method
var dataChangeListener = this.dataStructureListener.onDataStructureChanged(
collectionName,
args
);
queue.addChangeListener(
dataChangeListener,
enumSet,
ncache.DataTypeEventDataFilter.Data
);
// Perform operations
def datastructure_callback_function(collection_name, collection_event_args):
# Perform Operations
print("Event Fired for " + str(collection_name))
# Precondition: Cache is already connected
# Unique cache key for queue
key = "CandidateQueue"
# Create queue
queue = cache.get_data_structures_manager().create_queue(key, Candidate)
# Register ItemAdded, ItemUpdated, ItemRemoved events on queue created
events_list = [ncache.EventType.ITEM_ADDED, ncache.EventType.ITEM_UPDATED, ncache.EventType.ITEM_REMOVED]
queue.add_change_listener(datastructure_callback_function, events_list, ncache.DataTypeEventDataFilter.DATA)
Specify Callback for Event Notification
private void DataTypeDataNotificationCallback(string collectionName, DataTypeEventArg collectionEventArgs)
{
switch (collectionEventArgs.EventType)
{
case EventType.ItemAdded:
// Item has been added to the collection
break;
case EventType.ItemUpdated:
if (collectionEventArgs.CollectionItem != null)
{
// Item has been updated in the collection
// Perform operations
}
break;
case EventType.ItemRemoved:
// Item has been removed from the collection
break;
}
}
DataStructureDataChangeListener dataStructureListener = new DataStructureDataChangeListener() {
@Override
public void onDataStructureChanged(String collection, DataStructureEventArg dataStructureEventArg) {
switch (dataStructureEventArg.getEventType()) {
case ItemAdded:
// Item has been added to the collection
break;
case ItemUpdated:
if (dataStructureEventArg.getCollectionItem() != null) {
//Item has been updated in the collection
// perform operations
}
break;
case ItemRemoved:
//Item has been removed from the collection
break;
}
}
};
dataStructureListener = new ncache.DataStructureDataChangeListener();
{
function onDataStructureChanged(collection, dataStructureEventArg) {
switch (dataStructureEventArg.getEventType()) {
case ncache.EventType.ItemAdded:
//Item has been added to the collection
break;
case ncache.EventType.ItemUpdated:
if (dataStructureEventArg.getCollectionItem() != null) {
//Item has been updated in the collection
// perform operations
}
break;
case ncache.EventType.ItemRemoved:
//Item has been removed from the collection
break;
}
}
}
def datastructure_callback_function(collection_name: str, collection_event_args: DataStructureEventArg):
if collection_event_args.get_event_type() is ncache.EventType.ITEM_ADDED:
# Item has been added to the collection
print("Item added in " + collection_name)
elif collection_event_args.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Item has been updated in the collection
print("Item updated in " + collection_name)
elif collection_event_args.get_event_type() is ncache.EventType.ITEM_REMOVED:
# Item has been removed from the collection
print("Item removed from " + collection_name)
Locking Queues
Queues can be explicitly locked and unlocked to ensure data consistency. The following code sample creates a queue and locks it for a period of 10 seconds using Lock and then unlocks it using Unlock.
// Queue exists with key "CandidateQueue" cache key
string key = "CandidateQueue";
// Get queue
IDistributedQueue<Candidate> queue = cache.DataTypeManager.GetQueue<Candidate>(key);
// Lock queue for 10 seconds
bool isLocked = queue.Lock(TimeSpan.FromSeconds(10));
if (isLocked)
{
// Queue is successfully locked for 10 seconds
// Unless explicitly unlocked
}
else
{
// Queue is not locked because either:
// Queue is not present in the cache
// Queue is already locked
}
queue.Unlock();
// Preconditions: Cache is already connected
// Queue exists with key "CandidateQueue" cache key
String key = "CandidateQueue";
// Get queue
DistributedQueue<Candidate> queue = cache.getDataStructuresManager().getQueue(key, Candidate.class);
// Lock queue for 10 seconds
boolean isLocked = queue.lock(TimeSpan.FromSeconds(10));
if (isLocked) {
// Queue is successfully locked for 10 seconds
// Unless explicitly unlocked
} else {
// Queue is not locked because either:
// Queue is not present in the cache
// Queue is already locked
}
queue.unlock();
Additional Resources
NCache provides a sample application for the queue data structure on GitHub.
See Also
List Behavior and Usage in Cache
Sets Behavior and Usage in Cache
Dictionary Behavior and Usage in Cache
Counter Behavior and Usage in Cache