연속 쿼리 사용
필수 검색 가능 속성을 색인화했다고 가정하면 이제 애플리케이션에서 연속 쿼리를 구현해야 합니다. 연속 쿼리의 목적을 염두에 두고 가장 먼저 해야 할 일은 쿼리 결과 집합이 어떤 방식으로든 변경되면 실행해야 하는 모든 콜백을 정의하는 것입니다. 그런 다음 캐시 서버에 연속 쿼리를 등록해야 합니다.
모든 응용 프로그램에서 쿼리 결과 집합의 추적이 필요하지 않은 경우 알림 등록을 취소할 뿐만 아니라 캐시에서 쿼리도 등록 취소해야 합니다.
사전 조건
- 모든 작업에 필요한 표준 전제 조건에 대해 알아보려면 NCache 클라이언트측 기능은 다음 페이지를 참조하세요. 클라이언트 측 API 전제 조건.
- 검색 가능한 개체 및 해당 속성에 대한 인덱싱은 에 설명된 대로 먼저 구성해야 합니다. 쿼리 인덱스 구성 관리자 안내서에 있습니다.
- 캐시에는 구성된 속성과 관련된 일부 데이터가 있어야 합니다.
- API 세부 정보는 다음을 참조하세요. 아이캐시, 이벤트 데이터 필터, 이벤트 유형, ExecuteReader, CQ등록, 등록 취소CQ, 등록 취소 알림, 연속 쿼리, 등록 알림
, QueryDataNotification콜백, ICacheReader, 필드카운트, 읽기, 끼워 넣다, CQEventArg, 값 가져오기.
- 모든 작업에 필요한 표준 전제 조건에 대해 알아보려면 NCache 클라이언트측 기능은 다음 페이지를 참조하세요. 클라이언트 측 API 전제 조건.
- 검색 가능한 개체 및 해당 속성에 대한 인덱싱은 에 설명된 대로 먼저 구성해야 합니다. 쿼리 인덱스 구성 관리자 안내서에 있습니다.
- 캐시에는 구성된 속성과 관련된 일부 데이터가 있어야 합니다.
- API 세부 정보는 다음을 참조하세요. 캐시, 이벤트 데이터 필터, 이벤트 유형, 실행 리더, 레지스터CQ, 등록 취소CQ, RemoveDataModificationListener, 캐시 아이템, 캐시 항목 버전, 삽입하다, CQEventArg, 이벤트 유형, 읽기, 값을 얻다, 연속 쿼리, addDataModificationListener, getFieldCount, 캐시 리더.
- 모든 작업에 필요한 표준 전제 조건에 대해 알아보려면 NCache 클라이언트측 기능은 다음 페이지를 참조하세요. 클라이언트 측 API 전제 조건.
- 검색 가능한 개체 및 해당 속성에 대한 인덱싱은 에 설명된 대로 먼저 구성해야 합니다. 쿼리 인덱스 구성 관리자 안내서에 있습니다.
- 캐시에는 구성된 속성과 관련된 일부 데이터가 있어야 합니다.
- API 세부 정보는 다음을 참조하세요. 캐시, 이벤트 데이터 필터, 이벤트 유형, 실행 리더, 레지스터CQ, 등록 취소CQ, RemoveDataModificationListener, 캐시 아이템, 삽입하다, 이벤트 유형, 값을 얻다, 읽기, getFieldCount, 연속 쿼리, addDataModificationListener.
- 모든 작업에 필요한 표준 전제 조건에 대해 알아보려면 NCache 클라이언트측 기능은 다음 페이지를 참조하세요. 클라이언트 측 API 전제 조건.
- 검색 가능한 개체 및 해당 속성에 대한 인덱싱은 에 설명된 대로 먼저 구성해야 합니다. 쿼리 인덱스 구성 관리자 안내서에 있습니다.
- 캐시에는 구성된 속성과 관련된 일부 데이터가 있어야 합니다.
- API 세부 정보는 다음을 참조하세요. get_event_type, 실행 판독기, 레지스터_cq,
un_register_cq, Remove_data_modification_listener, 캐시 아이템, 삽입하다, get_field_count, add_data_modification_listener, 연속 쿼리, 읽기, get_value, 이벤트 유형, CQEventArg.
1단계: 이벤트에 대한 콜백 등록
검색 가능한 필수 속성을 인덱싱했다고 가정하면 애플리케이션에서 연속 쿼리를 구현할 수 있습니다. 이렇게 하려면 쿼리의 결과 집합이 수정되면 실행해야 하는 모든 콜백을 정의해야 합니다. 그런 다음 캐시 서버에 Continuous Query를 등록해야 합니다.
여러 이벤트에 대해 콜백을 등록할 수 있습니다.
public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
switch (arg.EventType)
{
case EventType.ItemAdded:
Console.WriteLine($"Item with key '{key}' has been added to resut set of continuous query");
break;
case EventType.ItemUpdated:
Console.WriteLine($"Item with key '{key}' has been updated in the resut set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.Item != null)
{
Product updatedProduct = arg.Item.GetValue<Product>();
Console.WriteLine($"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'");
}
break;
case EventType.ItemRemoved:
Console.WriteLine($"Item with key '{key}' has been removed from resut set of continuous query");
break;
}
}
public static void onChangeInQueryResultSet(String key, CQEventArg arg) {
switch (arg.getEventType()) {
case ItemAdded:
System.out.println("Item with key '" + key + "' has been added to result set of continuous query");
break;
case ItemUpdated:
System.out.println("Item with key '" + key + "' has been updated in the result set of continuous query");
// Get updated Product object
// Item can be used if EventDataFilter is DataWithMetadata or Metadata
if (arg.getItem() != null) {
Product updatedProduct = (Product) arg.getItem().getValue(Product.class);
System.out.println("Updated product '" + updatedProduct.getProductName() + "' with key '" + key + "' has ID '" + updatedProduct.getProductID() + "'");
}
break;
case ItemRemoved:
System.out.println("Item with key '" + key + "' has been removed from result set of the continuous query");
break;
}
}
// This is an async method
queryItemCallback(key, arg)
{
switch (arg.getEventType())
{
case ncache.ItemAdded:
// Key has been added to the cache
break;
case ncache.ItemUpdated:
// Key has been updated in the cache
// Get updated product object
if (arg.getItem() != null)
{
let updatedProduct = arg.getItem().getValue(Product);
// Perform operations accordingly
}
break;
case ncache.ItemRemoved:
// key has been removed from the cache
break;
}
}
# Precondition: Cache is already connected
def query_item_callback(key: str, arg: ncache.CQEventArg):
if arg.get_event_type() is ncache.EventType.ITEM_ADDED:
# Key has been added to the cache
print(key + " added to cache")
if arg.get_event_type() is ncache.EventType.ITEM_UPDATED:
# Key has been updated in the cache
# Get updated product object
if arg.get_item() is not None:
updated_product = arg.get_item().get_value(Product)
# Perform operations accordingly
if arg.get_event_type() is ncache.EventType.ITEM_REMOVED:
# key has been removed from the cache
print(key + " removed from cache")
주의 사항
작업이 안전하도록 하려면 에 설명된 대로 응용 프로그램 내에서 잠재적인 예외를 처리하는 것이 좋습니다. 처리 실패.
2단계: 쿼리 및 알림 등록
콜백이 등록된 후 이벤트가 실행될 결과 집합에 대한 기준을 지정하는 연속 쿼리를 만듭니다. 이 쿼리는 서버에 대해 등록됩니다.
연속 쿼리가 생성되면 미리 정의된 콜백이 쿼리에 등록됩니다. 콜백은 다음에 따라 등록됩니다. EventType
와 EventDataFilter
.
연속 쿼리는 이제 다음을 사용하여 서버에 등록할 수 있습니다. RegisterCQ
. 애플리케이션에서 이 메서드를 여러 번 사용하여 쿼리 데이터 세트의 변경 사항에 대한 알림을 받을 수 있습니다.
캐시 이벤트 알림의 수정 사항은 이벤트 유형에 따라 트리거됩니다. 캐시된 데이터를 쿼리하려면 ExecuteReader
쿼리를 실행하면 생성된 결과 집합이 클라이언트 측에서 청크 단위로 읽혀집니다.
결과 집합에 영향을 미치도록 캐시 데이터를 수정하여 이벤트를 트리거할 수 있습니다. 코드 샘플은 쿼리 결과 집합에 추가되도록 기존 캐시 항목을 업데이트하여 ItemAdded
행사.
경고
서버와 클라이언트 간의 연결이 끊어지면 이 기간 내에 실행된 이벤트는 클라이언트에서 수신되지 않습니다.
// Query for required operation
string query = "SELECT $VALUE$ FROM Alachisoft.NCache.Samples.Data.Product WHERE Category = ?";
// Create query command and add parameters
var queryCommand = new QueryCommand(query);
queryCommand.Parameters.Add("Category", "Beverages");
// Create Continuous Query
var cQuery = new ContinuousQuery(queryCommand);
// Item add, update, remove notification
// EventDataFilter.DataWithMetadata returns the cache keys added
cQuery.RegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved, EventDataFilter.DataWithMetadata);
// Register continuousQuery on server
cache.MessagingService.RegisterCQ(cQuery);
// Query for the desired operation
String query = "SELECT $VALUE$ FROM CQ.Product WHERE productId = ?";
// Create query command
QueryCommand queryCommand = new QueryCommand(query);
// Set parameters
HashMap<String, Object> parameters = queryCommand.getParameters();
parameters.put("productId", "Beverages");
// Create Continuous Query
ContinuousQuery cQuery = new ContinuousQuery(queryCommand);
// Register notifications for continuous query (ItemAdded, ItemUpdated, and ItemRemoved events)
cQuery.addDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved), EventDataFilter.DataWithMetadata);
// Register the continuous query on the server
cache.getMessagingService().registerCQ(cQuery);
System.out.println("Registered Successfully");
// Precondition: Cache is already connected
// Query for required operation
let query = "SELECT Values FROM FQN.Product WHERE Category = ?";
var queryCommand = new QueryCommand(query);
queryCommand.getParameters().put("Category", "Beverages");
// Create continuous query
let continuousQuery = new ncache.ContinuousQuery(queryCommand);
let listener = new ncache.EventFilter();
var eventType = ncache.EnumSet.of(ncache.EventType.ItemAdded, ncache.EventType.ItemRemoved, ncache.EventType.ItemUpdated);
// Item remove notification
// EventDataFilter.Metadata returns cache keys + item metadata on updation
continuousQuery.addDataModificationListener(listener, eventType, ncache.EventDataFilter.None);
// Register continuous query on server
await this.cache.getMessagingService().registerCQ(continuousQuery);
let reader = await this.cache.getSearchService().executeReader(queryCommand);
if (reader.getFieldCount() > 0)
{
while (reader.read())
{
let result = reader.getValue(1, Product);
// Perform operations
}
}
else
{
// Null query result set returned
}
// Update Product data in cache to trigger callback
let updatedProduct = new Product();
updatedProduct.setProductID(1001);
updatedProduct.setProductName("Tea");
let key = "Product:" + updatedProduct.getProductID();
let cacheItem = new ncache.CacheItem(updatedProduct);
// Trigger add notifications
let version = await this.cache.insert(key, cacheItem);
// This will add item to the result set as it matches query criteria
# Query for required operation
query = "SELECT $Value$ FROM FQN.Product WHERE category = ?"
query_command = ncache.QueryCommand(query)
query_command.set_parameters({"Category": "Beverages"})
# Create continuous query
continuous_query = ncache.ContinuousQuery(query_command)
event_type = [ncache.EventType.ITEM_REMOVED]
# Item remove notification
# EventDataFilter.Metadata returns cache keys + item metadata on updation
continuous_query.add_data_modification_listener(cq_event_listener, event_type, ncache.EventDataFilter.NONE)
# Register continuous query on server
cache.get_messaging_service().register_cq(continuous_query)
reader = cache.get_search_service().execute_reader(query_command)
if reader.get_field_count() > 0:
while reader.read():
result = reader.get_value(Product, 1)
# Perform operations
else:
# None query result set returned
print("Query result is None")
# Update Product data in cache to trigger callback
updated_product = Product()
updated_product.set_product_id(1001)
updated_product.set_product_name("Tea")
key = "Product:" + updated_product.get_product_id()
cache_item = ncache.CacheItem(updated_product)
# Trigger add notifications
version = cache.insert(key, cache_item)
# This will add item to the result set as it matches query criteria
3단계: 연속 쿼리에서 알림 등록 취소
애플리케이션에 더 이상 필요하지 않은 알림은 연속 쿼리에서 등록 취소할 수 있습니다. 여러 이벤트 유형이 등록된 경우 특정 이벤트 유형에 대한 알림을 등록 취소할 수 있습니다. UnRegisterNotification
방법.
예를 들어, ItemAdded
와 ItemRemoved
이벤트 유형이 등록되었지만 비즈니스 로직에 더 이상 이벤트가 필요하지 않습니다. ItemAdded
, 특히 다음에 대한 알림 등록을 취소할 수 있습니다. ItemAdded
이벤트.
// Unregister notifications for ItemAdded events from continuous query
cQuery.UnRegisterNotification(new QueryDataNotificationCallback(OnChangeInQueryResultSet), EventType.ItemAdded);
// Unregister notifications for ItemAdded events only
cQuery.removeDataModificationListener(new QueryDataModificationListener() {
@Override
public void onQueryDataModified(String key, CQEventArg arg) {
// Your callback method to handle the notifications
// For instance, you can call the 'onChangeInQueryResultSet' method from your earlier code here
onChangeInQueryResultSet(key, arg);
}
}, EnumSet.of(EventType.ItemAdded, EventType.ItemUpdated, EventType.ItemRemoved));
// Unregister notifications for ItemAdded events only
var eventType = ncache.EnumSet.of(ncache.EventType.ADDED);
continuousQuery.removeDataModificationListener(listener, eventType);
# Unregister notifications for ItemAdded events only
event_type = [ncache.EventType.ITEM_ADDED]
c_query.remove_data_modification_listener(cq_event_listener, event_type)
4단계: 서버에서 연속 쿼리 등록 취소
애플리케이션이 더 이상 쿼리 결과 집합의 변경 사항에 대한 알림을 받는 데 관심이 없으면 등록된 연속 쿼리를 서버에서 등록 취소해야 합니다.
UnregisterCQ
인수로 객체를 취합니다. ContinuousQuery
이 호출 후에 더 이상 실행되지 않는 콜백을 등록 취소합니다.
// Unregister Continuous Query from server
cache.MessagingService.UnRegisterCQ(cQuery);
// Unregister cq from server
cache.getMessagingService().unRegisterCQ(cQuery);
// Unregister cq from server
await this.cache.getMessagingService().unRegisterCQ(continuousQuery);
# Unregister cq from server
cache.get_messaging_service().un_register_cq(c_query)
추가 자료
NCache 연속 쿼리에 대한 샘플 애플리케이션을 제공합니다. GitHub의.
도 참조
.그물: Alachisoft.NCache.런타임.이벤트 네임 스페이스.
자바 : COM.alachisoft.ncache.이벤트 네임 스페이스.
Node.js : 연속 쿼리 클래스입니다.
파이썬 : ncache.runtime.caching.events 클래스입니다.