Use continuous querying in your clients to receive continuous updates to queries run on the servers.
Before you begin, you should be familiar with Querying and have your client/server system configured.
Configure the client pools you will use for CQs with subscription-enabled
set to true.
To have CQ and interest subscription events arrive as closely together as possible, use a single pool for everything. Different pools might use different servers, which can lead to greater differences in event delivery time.
Write your OQL query to retrieve the data you need from the server.
The query must satisfy these CQ requirements in addition to the standard GemFire querying specifications:
The query must be a SELECT expression only, preceded by zero or more IMPORT statements. This means the query cannot be a statement such as “/tradeOrder.name”
or “(SELECT * from /tradeOrder).size”.
The CQ query cannot use:
The CQ query must be created on a partitioned or replicated region. See Region Type Restrictions for CQs.
The basic syntax for the CQ query is:
SELECT * FROM /fullRegionPath [iterator] [WHERE clause]
This example query could be used to get all trade orders where the price is over $100:
SELECT * FROM /tradeOrder t WHERE t.price > 100.00
Write your CQ listeners to handle CQ events from the server. Implement org.apache.geode.cache.query.CqListener
in each event handler you need. In addition to your main CQ listeners, you might have listeners that you use for all CQs to track statistics or other general information.
Note: Be especially careful if you choose to update your cache from your CqListener
. If your listener updates the region that is queried in its own CQ and that region has a Pool
named, the update will be forwarded to the server. If the update on the server satisfies the same CQ, it may be returned to the same listener that did the update, which could put your application into an infinite loop. This same scenario could be played out with multiple regions and multiple CQs, if the listeners are programmed to update each other’s regions.
This example outlines a CqListener
that might be used to update a display screen with current data from the server. The listener gets the queryOperation
and entry key and value from the CqEvent
and then updates the screen according to the type of queryOperation
.
// CqListener class
public class TradeEventListener implements CqListener
{
public void onEvent(CqEvent cqEvent)
{
// org.apache.geode.cache Operation associated with the query op
Operation queryOperation = cqEvent.getQueryOperation();
// key and new value from the event
Object key = cqEvent.getKey();
TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue();
if (queryOperation.isUpdate())
{
// update data on the screen for the trade order . . .
}
else if (queryOperation.isCreate())
{
// add the trade order to the screen . . .
}
else if (queryOperation.isDestroy())
{
// remove the trade order from the screen . . .
}
}
public void onError(CqEvent cqEvent)
{
// handle the error
}
// From CacheCallback public void close()
{
// close the output screen for the trades . . .
}
}
When you install the listener and run the query, your listener will handle all of the CQ results.
If you need your CQs to detect whether they are connected to any of the servers that host its subscription queues, implement a CqStatusListener
instead of a CqListener
. CqStatusListener
extends the current CqListener
, allowing a client to detect when a CQ is connected and/or disconnected from the server(s). The onCqConnected()
method will be invoked when the CQ is connected, and when the CQ has been reconnected after being disconnected. The onCqDisconnected()
method will be invoked when the CQ is no longer connected to any servers.
Taking the example from step 3, we can instead implement a CqStatusListener
:
public class TradeEventListener implements CqStatusListener
{
public void onEvent(CqEvent cqEvent)
{
// org.apache.geode.cache Operation associated with the query op
Operation queryOperation = cqEvent.getQueryOperation();
// key and new value from the event
Object key = cqEvent.getKey();
TradeOrder tradeOrder = (TradeOrder)cqEvent.getNewValue();
if (queryOperation.isUpdate())
{
// update data on the screen for the trade order . . .
}
else if (queryOperation.isCreate())
{
// add the trade order to the screen . . .
}
else if (queryOperation.isDestroy())
{
// remove the trade order from the screen . . .
}
}
public void onError(CqEvent cqEvent)
{
// handle the error
}
// From CacheCallback public void close()
{
// close the output screen for the trades . . .
}
public void onCqConnected() {
//Display connected symbol
}
public void onCqDisconnected() {
//Display disconnected symbol
}
}
When you install the CqStatusListener
, your listener will be able to detect its connection status to the servers that it is querying.
Program your client to run the CQ:
CqAttributesFactory
and use it to set your CqListener
s and CqStatusListener
.QueryService
to create a new CqQuery
.CqQuery
object. You can execute with or without an initial result set.// Get cache and queryService - refs to local cache and QueryService
// Create client /tradeOrder region configured to talk to the server
// Create CqAttribute using CqAttributeFactory
CqAttributesFactory cqf = new CqAttributesFactory();
// Create a listener and add it to the CQ attributes callback defined below
CqListener tradeEventListener = new TradeEventListener();
cqf.addCqListener(tradeEventListener);
CqAttributes cqa = cqf.create();
// Name of the CQ and its query
String cqName = "priceTracker";
String queryStr = "SELECT * FROM /tradeOrder t where t.price > 100.00";
// Create the CqQuery
CqQuery priceTracker = queryService.newCq(cqName, queryStr, cqa);
try
{ // Execute CQ, getting the optional initial result set
// Without the initial result set, the call is priceTracker.execute();
SelectResults sResults = priceTracker.executeWithInitialResults();
for (Object o : sResults) {
Struct s = (Struct) o;
TradeOrder to = (TradeOrder) s.get("value");
System.out.println("Intial result includes: " + to);
}
}
catch (Exception ex)
{
ex.printStackTrace();
}
// Now the CQ is running on the server, sending CqEvents to the listener
. . .
// End of life for the CQ - clear up resources by closing
priceTracker.close();
With continuous queries, you can optionally implement: