缓存中的队列行为和使用
队列是一种有序数据结构,采用先进先出 (FIFO) 行为原则。 您可以使用队列来维护运行时正在处理的信息。 例如,公司潜在候选人的申请 ID 可以存储在队列中。 由于队列本质上是先进先出的,第一个申请的候选人将首先接受面试,依此类推。
NCache 通过提供进一步增强队列数据结构 NCache-特定功能,例如 组别, 标签, 過期, 锁定, 依赖, 和更多。 在这种情况下,公司可以针对申请管理工作的人员队列指定一个组。
宠物行为研究
- 队列可以是任何原始类型或自定义对象。
- 一个队列
CacheItem
并且还不支持嵌套队列。
- 队列被命名。 因此,您需要为队列提供唯一的缓存键。
- Null 不是受支持的值类型。
- 支持重复值。
先决条件
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参阅给定页面 客户端 API 先决条件.
- 有关 API 详细信息,请参阅: 缓存, ID分布式队列, 数据类型管理器, 包含, 复制到, 数组, 窥视, 获取队列, 出队, 入队, 创建队列, 注册通知, 数据类型数据通知回调, 事件类型, 数据类型事件数据过滤器, 锁, 开锁.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参阅给定页面 客户端 API 先决条件.
- 有关 API 详细信息,请参阅: 缓存, 分布式队列, 创建队列, 获取队列, 包含, 锁, 开锁, 事件类型, 获取事件类型, 添加更改监听器, 获取数据结构管理器, 数据结构改变时, 数据类型事件数据过滤器, 数据结构数据更改监听器, 数据结构事件参数, 获取集合项.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参阅给定页面 客户端 API 先决条件.
- 有关 API 详细信息,请参阅: 缓存, 分布式队列, 获取数据结构管理器, 创建队列, 加, 获取队列, 包含, 复制到, 数组, 窥视, 去掉, 事件类型, 数据结构数据更改监听器, 添加更改监听器, 数据类型事件数据过滤器, 数据结构事件参数, 获取事件类型, 获取集合项.
- 了解与所有人员合作所需的标准先决条件 NCache 客户端功能请参阅给定页面 客户端 API 先决条件.
- 有关 API 详细信息,请参阅: 缓存, 数据结构管理器, 获取数据结构管理器, 分布式队列, 包含, 加, 获取迭代器, 去掉, 添加更改侦听器, 创建队列, 获取队列, 获取事件类型, 数据类型事件数据过滤器, 事件数据过滤器, 数据结构数据更改监听器, 数据结构改变时.
创建队列并添加数据
以下代码示例显示了一个队列如何 候选人 可以根据缓存键在缓存中创建类型 候选队列 以及如何将项目添加到队列中。
Tips:
你也可以 配置可搜索属性 例如组/标签/命名标签和 失效属性 例如创建数据结构时的 Expiration/Eviction/Dependency。
// Precondition: Cache must be connected
// Specify unique cache key for queue
string key = "CandidateQueue";
// Create Queue of Candidate type
IDistributedQueue<Candidate> queue = cache.DataTypeManager.CreateQueue<Candidate>(key);
Candidate[] candidates = FetchCandidates();
foreach(var candidate in candidates)
{
// Add candidates to queue
queue.Enqueue(candidate);
}
// Precondition: Cache must be connected
// Specify unique cache key for queue
String key = "CandidateQueue";
// Create Queue of Candidate type
DistributedQueue<Candidate> queue = cache.getDataStructuresManager().createQueue(key, Candidate.class);
Candidate[] candidates = fetchCandidates();
for (var candidate : candidates) {
// Add candidates to queue
queue.add(candidate);
}
// This is an async method
// Precondition: Cache must be connected
// Specify unique cache key for queue
var key = "CandidateQueue";
// Create Queue
var manager = await this.cache.getDataStructuresManager();
var queue = await manager.createQueue(key, ncache.JsonDataType.Object);
var candidates = this.fetchCandidates();
for (var candidate in candidates) {
// Add candidates to queue
queue.add(candidate);
}
# Precondition: Cache must be connected
# Specify unique cache key for queue
key = "CandidateQueue"
# Create Queue
manager = cache.get_data_structures_manager()
queue = manager.create_queue(key, Candidate)
candidates = fetch_candidates()
for candidate in candidates:
# Add candidates to queue
queue.add(candidate)
# Check if a candidate exists against given ID
candidate = get_candidate_by_id(1002)
if queue.contains(candidate):
# Candidate exists with this ID
print("Candidate found")
以下代码示例使用从缓存中获取队列 GetQueue
然后检查队列中是否存在项目 Contains
. 如何从队列的开头获取一个项目而不使用删除它 Peek
。 要将整个源队列复制到目标数组,请使用 CopyTo
,并使用将此队列复制到一维数组中 ToArray
.
// Queue with this key already exists in cache
string key = "CandidateQueue";
// Get queue and show items of queue
IDistributedQueue<Candidate> retrievedQueue = cache.DataTypeManager.GetQueue<Candidate>(key);
// Get any candidate ID
Candidate candidate = GetCandidateByID(1002);
// Check whether queue contains the a particular candidate or not
if(retrievedQueue.Contains(candidate))
{
// Candidate exists with this ID
}
// List of candidate's for the queue to be copied into
Candidate[] candidates = new Candidate[retrievedQueue.Count];
// Copy the queue to another array
retrievedQueue.CopyTo(candidates, 0);
// Copy queue to one-dimensional array
candidates = retrievedQueue.ToArray();
// Get an item at beginning from queue without removing
Candidate firstCandidate = retrievedQueue.Peek();
// Precondition: Cache is already connected
// Queue with this key already exists in cache
String key = "CandidateQueue";
// Get queue and show items of queue
DistributedQueue<Candidate> retrievedQueue = cache.getDataStructuresManager().getQueue(key, Candidate.class);
// Get any candidate's ID
Candidate candidate = getCandidateByID(1002);
// Check whether queue contains the a particular candidate or not
if(retrievedQueue.contains(candidate))
{
// Candidate exists with this ID
}
// List of candidates for the queue to be copied into
Candidate[] candidates = new Candidate[retrievedQueue.size()];
// Copy the queue to another array
retrievedQueue.copyTo(candidates, 0);
// Copy queue to one-dimensional array
candidates = retrievedQueue.toArray();
// Get an item at beginning from queue without removing
Candidate firstCandidate = retrievedQueue.peek();
// This is an async method
// Precondition: Cache is already connected
// Queue with this key already exists in cache
var key = "CandidateQueue";
// Get queue and show items of queue
var manager = await this.cache.getDataStructuresManager();
var retrievedQueue = await manager.getQueue(key, ncache.JsonDataType.Object);
// Get any candidate's ID
var candidate = this.getCandidateByID(1002);
// Check whether queue contains the a particular candidate or not
if(retrievedQueue.contains(candidate))
{
// Candidate exists with this ID
}
// List of candidates for the queue to be copied into
var candidates = new Candidate[retrievedQueue.length];
// Copy the queue to another array
retrievedQueue.copyTo(candidates, 0);
// Copy queue to one-dimensional array
var candidates = retrievedQueue.toArray();
// Get an item at beginning from queue without removing
var firstCandidate = retrievedQueue.peek();
# Precondition: Cache is already connected
# Queue with this key already exists in cache
key = "CandidateQueue"
# Get queue and show items of queue
manager = cache.get_data_structures_manager()
retrieved_queue = manager.get_queue(key, Candidate)
if retrieved_queue is not None:
for item in retrieved_queue.get_iterator():
# Perform operations
print(item)
else:
# Queue does not exist
print("Queue not found")
从队列中删除项目
项目可以以先进先出的方式从队列中移除。 以下代码示例从队列中删除第一个候选者。
// Queue with this key already exists in cache
string key = "CandidateQueue";
IDistributedQueue<Candidate> retrievedQueue = cache.DataTypeManager.GetQueue<Candidate>(key);
// Remove first item of queue
retrievedQueue.Dequeue();
// Precondition: Cache must be connected
// Queue with this key already exists in cache
String key = "CandidateQueue";
DistributedQueue<Candidate> retrievedQueue = cache.getDataStructuresManager().getQueue(key, Candidate.class);
// Remove first item of queue
retrievedQueue.remove();
// This is an async method
// Precondition: Cache must be connected
// Queue with this key already exists in cache
var key = "CandidateQueue";
var manager = await this.cache.getDataStructuresManager();
var retrievedQueue = await manager.getQueue(key, ncache.JsonDataType.Object);
// Remove first item of queue
await retrievedQueue.remove();
# Precondition: Cache must be connected
# Queue with this key already exists in cache
key = "CandidateQueue"
manager = cache.get_data_structures_manager()
retrieved_queue = manager.get_queue(key, Candidate)
# Remove first item of queue
retrieved_queue.remove()
备注
为确保操作是故障安全的,建议处理应用程序中的任何潜在异常,如中所述 处理故障.
队列上的事件通知
您可以在数据结构(例如队列)上注册缓存事件、基于键的事件和数据结构事件。 有关行为,请参阅 明智的行为.
以下代码示例注册了一个缓存事件 ItemAdded
和 ItemUpdated
以及注册一个事件 ItemAdded
和 ItemUpdated
在缓存中的队列中。 在缓存中创建队列后, ItemAdded
缓存级事件被触发。 然而,一旦一个项目被添加到队列中, ItemAdded
数据结构事件被触发,并且 ItemUpdated
缓存级别事件被触发。
在创建的队列上注册事件
// Unique cache key for queue
string key = "CandidateQueue";
// Create queue of Candidate type
IDistributedQueue<Candidate> queue = cache.DataTypeManager.CreateQueue<Candidate>(key);
// Register ItemAdded, ItemUpdated, ItemRemoved events on queue created
// DataTypeNotificationCallback is callback method specified
queue.RegisterNotification(DataTypeDataNotificationCallback, EventType.ItemAdded |
EventType.ItemUpdated | EventType.ItemRemoved,
DataTypeEventDataFilter.Data);
// Perform operations
// Precondition: Cache is already connected
// Unique cache key for the queue
String key = "CandidateQueue";
// Create queue of candidate type
DistributedQueue<Candidate> queue = cache.getDataStructuresManager().createQueue(key, Candidate.class);
// Create EnumSet of event types
EnumSet<EventType> enumSet = EnumSet.of(com.alachisoft.ncache.runtime.events.EventType.ItemAdded,
EventType.ItemUpdated, EventType.ItemRemoved);
// Register ItemAdded, ItemUpdated, ItemRemoved events on queue created
// dataChangeListener is the specified callback method
DataStructureDataChangeListener dataChangeListener = dataStructureListener.onDataStructureChanged(collectionName, args);
queue.addChangeListener(dataChangeListener, enumSet, DataTypeEventDataFilter.Data);
// Perform operations
// This is an async method
// Precondition: Cache is already connected
// Unique cache key for the queue
var key = "CandidateQueue";
// Create queue of candidate type
var queue = await this.cache
.getDataStructuresManager()
.createQueue(key, ncache.JsonDataType.Object);
// Create EnumSet of event types
var enumSet = enumSet.of(
ncache.EventType.ItemAdded,
ncache.EventType.ItemUpdated,
ncache.EventType.ItemRemoved
);
// Register ItemAdded, ItemUpdated, ItemRemoved events on queue created
// dataChangeListener is the specified callback method
var dataChangeListener = this.dataStructureListener.onDataStructureChanged(
collectionName,
args
);
queue.addChangeListener(
dataChangeListener,
enumSet,
ncache.DataTypeEventDataFilter.Data
);
// Perform operations
def datastructure_callback_function(collection_name, collection_event_args):
# Perform Operations
print("Event Fired for " + str(collection_name))
# Precondition: Cache is already connected
# Unique cache key for queue
key = "CandidateQueue"
# Create queue
queue = cache.get_data_structures_manager().create_queue(key, Candidate)
# Register ItemAdded, ItemUpdated, ItemRemoved events on queue created
events_list = [ncache.EventType.ITEM_ADDED, ncache.EventType.ITEM_UPDATED, ncache.EventType.ITEM_REMOVED]
queue.add_change_listener(datastructure_callback_function, events_list, ncache.DataTypeEventDataFilter.DATA)
指定事件通知的回调
private void DataTypeDataNotificationCallback(string collectionName, DataTypeEventArg collectionEventArgs)
{
switch (collectionEventArgs.EventType)
{
case EventType.ItemAdded:
// Item has been added to the collection
break;
case EventType.ItemUpdated:
if (collectionEventArgs.CollectionItem != null)
{
// Item has been updated in the collection
// Perform operations
}
break;
case EventType.ItemRemoved:
// Item has been removed from the collection
break;
}
}
DataStructureDataChangeListener dataStructureListener = new DataStructureDataChangeListener() {
@Override
public void onDataStructureChanged(String collection, DataStructureEventArg dataStructureEventArg) {
switch (dataStructureEventArg.getEventType()) {
case ItemAdded:
// Item has been added to the collection
break;
case ItemUpdated:
if (dataStructureEventArg.getCollectionItem() != null) {
//Item has been updated in the collection
// perform operations
}
break;
case ItemRemoved:
//Item has been removed from the collection
break;
}
}
};
dataStructureListener = new ncache.DataStructureDataChangeListener();
{
function onDataStructureChanged(collection, dataStructureEventArg) {
switch (dataStructureEventArg.getEventType()) {
case ncache.EventType.ItemAdded:
//Item has been added to the collection
break;
case ncache.EventType.ItemUpdated:
if (dataStructureEventArg.getCollectionItem() != null) {
//Item has been updated in the collection
// perform operations
}
break;
case ncache.EventType.ItemRemoved:
//Item has been removed from the collection
break;
}
}
}
def datastructure_callback_function(collection_name: str, collection_event_args: DataStructureEventArg):
if collection_event_args.get_event_type() is ncache.EventType.ITEM_ADDED:
# Item has been added to the collection
print("Item added in " + collection_name)
elif collection_event_args.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Item has been updated in the collection
print("Item updated in " + collection_name)
elif collection_event_args.get_event_type() is ncache.EventType.ITEM_REMOVED:
# Item has been removed from the collection
print("Item removed from " + collection_name)
锁定队列
队列可以显式锁定和解锁以确保数据一致性。 以下代码示例创建一个队列并将其锁定 10 秒,使用 锁 然后使用解锁它 开锁.
// Queue exists with key "CandidateQueue" cache key
string key = "CandidateQueue";
// Get queue
IDistributedQueue<Candidate> queue = cache.DataTypeManager.GetQueue<Candidate>(key);
// Lock queue for 10 seconds
bool isLocked = queue.Lock(TimeSpan.FromSeconds(10));
if (isLocked)
{
// Queue is successfully locked for 10 seconds
// Unless explicitly unlocked
}
else
{
// Queue is not locked because either:
// Queue is not present in the cache
// Queue is already locked
}
queue.Unlock();
// Preconditions: Cache is already connected
// Queue exists with key "CandidateQueue" cache key
String key = "CandidateQueue";
// Get queue
DistributedQueue<Candidate> queue = cache.getDataStructuresManager().getQueue(key, Candidate.class);
// Lock queue for 10 seconds
boolean isLocked = queue.lock(TimeSpan.FromSeconds(10));
if (isLocked) {
// Queue is successfully locked for 10 seconds
// Unless explicitly unlocked
} else {
// Queue is not locked because either:
// Queue is not present in the cache
// Queue is already locked
}
queue.unlock();
更多资讯
NCache 为队列数据结构提供了一个示例应用程序 GitHub上.
参见
.NET: Alachisoft.NCache.Client.DataTypes 命名空间。
Java的: COM。alachisoft.ncache.client.数据结构 命名空间。
节点.js: 分布式队列 类。
Python: ncache.client.数据结构 类。