By default, Tanzu GemFire uses multiple dispatcher threads to process region events simultaneously in a gateway sender queue for distribution between sites, or in an asynchronous event queue for distributing events for write-behind caching. With serial queues, you can also configure the ordering policy for dispatching those events.
By default, a gateway sender queue or asynchronous event queue uses 5 dispatcher threads per queue. This provides support for applications that have the ability to process queued events concurrently for distribution to another Tanzu GemFire site or listener. If your application does not require concurrent distribution, or if you do not have enough resources to support the requirements of multiple dispatcher threads, then you can configure a single dispatcher thread to process a queue.
When multiple dispatcher threads are configured for a parallel queue, Tanzu GemFire simply uses multiple threads to process the contents of each individual queue. The total number of queues that are created is still determined by the number of Tanzu GemFire members that host the region.
When multiple dispatcher threads are configured for a serial queue, Tanzu GemFire creates an additional copy of the queue for each thread on each member that hosts the queue. To obtain the maximum throughput, increase the number of dispatcher threads until your network is saturated.
The following diagram illustrates a serial gateway sender queue that is configured with multiple dispatcher threads.
When a serial gateway sender or an asynchronous event queue uses multiple dispatcher threads, consider the following:
maximum-queue-memory
setting applies to each copy of the serial queue. If you configure 10 dispatcher threads and the maximum queue memory is set to 100MB, then the total maximum queue memory for the queue is 1000MB on each member that hosts the queue.When using multiple dispatcher-threads
(greater than 1) with a serial event queue, you can also configure the order-policy
that those threads use to distribute events from the queue. The valid order policy values are:
PartitionResolver
to implement custom partitioning. With partition ordering, all entries that share the same “partitioning key” (RoutingObject) are placed into the same dispatcher thread queue.You cannot configure the order-policy
for a parallel event queue, because parallel queues cannot preserve event ordering for regions. Only the ordering of events for a given partition (or in a given queue of a distributed region) can be preserved.
To increase the number of dispatcher threads and set the ordering policy for a serial gateway sender, use one of the following mechanisms.
cache.xml configuration
<cache>
<gateway-sender id="NY" parallel="false"
remote-distributed-system-id="1"
enable-persistence="true"
disk-store-name="gateway-disk-store"
maximum-queue-memory="200"
dispatcher-threads=7 order-policy="key"/>
...
</cache>
Java API configuration
Cache cache = new CacheFactory().create();
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setParallel(false);
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName("gateway-disk-store");
gateway.setMaximumQueueMemory(200);
gateway.setDispatcherThreads(7);
gateway.setOrderPolicy(OrderPolicy.KEY);
GatewaySender sender = gateway.create("NY", "1");
sender.start();
gfsh:
gfsh>create gateway-sender -d="NY"
--parallel=false
--remote-distributed-system-id="1"
--enable-persistence=true
--disk-store-name="gateway-disk-store"
--maximum-queue-memory=200
--dispatcher-threads=7
--order-policy="key"
The following examples show how to set dispatcher threads and ordering policy for an asynchronous event queue:
cache.xml configuration
<cache>
<async-event-queue id="sampleQueue" persistent="true"
disk-store-name="async-disk-store" parallel="false"
dispatcher-threads=7 order-policy="key">
<async-event-listener>
<class-name>MyAsyncEventListener</class-name>
<parameter name="url">
<string>jdbc:db2:SAMPLE</string>
</parameter>
<parameter name="username">
<string>gfeadmin</string>
</parameter>
<parameter name="password">
<string>admin1</string>
</parameter>
</async-event-listener>
</async-event-queue>
...
</cache>
Java API configuration
Cache cache = new CacheFactory().create();
AsyncEventQueueFactory factory = cache.createAsyncEventQueueFactory();
factory.setPersistent(true);
factory.setDiskStoreName("async-disk-store");
factory.setParallel(false);
factory.setDispatcherThreads(7);
factory.setOrderPolicy(OrderPolicy.KEY);
AsyncEventListener listener = new MyAsyncEventListener();
AsyncEventQueue sampleQueue = factory.create("customerWB", listener);
Entry updates in the current, in-process batch are not eligible for conflation.
gfsh:
gfsh>create async-event-queue --id="sampleQueue" --persistent=true
--disk-store="async-disk-store" --parallel=false
--dispatcher-threads=7 order-policy="key"
--listener=myAsycEventListener
--listener-param=url#jdbc:db2:SAMPLE
--listener-param=username#gfeadmin
--listener-param=password#admin1