This topic explains how to conflate events in a queue in VMware Tanzu GemFire.
Conflating a queue improves distribution performance. When conflation is enabled, only the latest queued value is sent for a particular key.
NoteDo not use conflation if your receiving applications depend on the specific ordering of entry modifications, or if they need to be notified of every change to an entry.
Conflation is most useful when a single entry is updated frequently, but other sites only need to know the current value of the entry (rather than the value of each update). When an update is added to a queue that has conflation enabled, if there is already an update message in the queue for the entry key, then the existing message is removed and the new update is added to the end of the queue, as shown here for key A.
NoteThis method of conflation is different from the one used for server-to-client subscription queue conflation and peer-to-peer distribution within a cluster.
To enable conflation for a gateway sender queue, use one of the following mechanisms:
cache.xml configuration
<cache>
<gateway-sender id="NY" parallel="true"
remote-distributed-system-id="1"
enable-persistence="true"
disk-store-name="gateway-disk-store"
enable-batch-conflation="true"/>
...
</cache>
Java API configuration
Cache cache = new CacheFactory().create();
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setParallel(true);
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName("gateway-disk-store");
gateway.setBatchConflationEnabled(true);
GatewaySender sender = gateway.create("NY", "1");
sender.start();
Entry updates in the current, in-process batch are not eligible for conflation.
gfsh:
gfsh>create gateway-sender --id="NY" --parallel=true
--remote-distributed-system-id="1"
--enable-persistence=true
--disk-store-name="gateway-disk-store"
--enable-batch-conflation=true
The following examples show how to configure conflation for an asynchronous event queue:
cache.xml configuration
<cache>
<async-event-queue id="sampleQueue" persistent="true"
disk-store-name="async-disk-store" parallel="false"
enable-batch-conflation="true">
<async-event-listener>
<class-name>MyAsyncEventListener</class-name>
<parameter name="url">
<string>jdbc:db2:SAMPLE</string>
</parameter>
<parameter name="username">
<string>gfeadmin</string>
</parameter>
<parameter name="password">
<string>admin1</string>
</parameter>
</async-event-listener>
</async-event-queue>
...
</cache>
Java API configuration
Cache cache = new CacheFactory().create();
AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
factory.setPersistent(true);
factory.setDiskStoreName("async-disk-store");
factory.setParallel(false);
factory.setBatchConflationEnabled(true);
AsyncEventListener listener = new MyAsyncEventListener();
AsyncEventQueue sampleQueue = factory.create("customerWB", listener);
Entry updates in the current, in-process batch are not eligible for conflation.
gfsh:
gfsh>create async-event-queue --id="sampleQueue" --persistent=true
--disk-store="async-disk-store" --parallel="false"
--listener=myAsyncEventListener
--listener-param=url#jdbc:db2:SAMPLE
--listener-param=username#gfeadmin
--listener-param=password#admin1