This topic provides code fragments for an example of an application running a transaction directly and an example of an application invoking a function that contains a transaction in VMware Tanzu GemFire.
An application can run a transaction directly or invoke a function which contains a transaction. This section illustrates these two use cases with code fragments that demonstrate the proper way to program a transaction.
An expected use case operates on two regions within a transaction. For performance purposes the Tanzu GemFire transaction implementation requires that region entries of partitioned regions be colocated. See Custom-Partitioning and Colocating Data for details on how to colocate region entries.
An application/client uses the CacheTransactionManager
API. This most basic code fragment shows the structure of a transaction, with its begin
to start the transaction, commit
to end the transaction, and handling of exceptions that these methods may throw.
CacheTransactionManager txManager =
cache.getCacheTransactionManager();
try {
txManager.begin();
// ... do transactional, region operations
txManager.commit();
} catch (CommitConflictException conflict) {
// ... do necessary work for a transaction that failed on commit
} finally {
// All other exceptions will be handled by the caller.
// Examples of some exceptions: the data is not colocated, a rebalance
// interfered with the transaction, or the server is gone.
// Any exception thrown by a method other than commit() needs
// to do a rollback to avoid leaking the transaction state.
if(txManager.exists()) {
txManager.rollback();
}
}
More details of a transaction appear in this next application/client code fragment example. In this typical transaction, the put operations must be atomic and two regions are involved.
In this transaction, a customer’s purchase is recorded. The cash
region contains each customer’s cash balance available for making trades. The trades
region records each customer’s balance spent on trades.
If there is a conflict upon commit of the transaction, an exception is thrown, and this example tries again.
// inputs needed for this transaction; shown as variables for simplicity
final String customer = "Customer1";
final Integer purchase = 1000;
// region set up shown to promote understanding
Cache cache = new CacheFactory().create();
Pool pool = PoolManager.createFactory()
.addLocator("localhost", LOCATOR_PORT)
.create("pool-name");
Region<String, Integer> cash =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
.setPoolName(pool.getName())
.create("cash");
Region<String, Integer> trades =
cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
.setPoolName(pool.getName())
.create("trades");
// transaction code
CacheTransactionManager txManager = cache.getCacheTransactionManager();
boolean retryTransaction = false;
do {
try {
txManager.begin();
// Subtract out the cost of the trade for this customer's balance
Integer cashBalance = cash.get(customer);
Integer newBalance = (cashBalance != null ? cashBalance : 0) - purchase;
cash.put(customer, newBalance);
// Add in the cost of the trade for this customer
Integer tradeBalance = trades.get(customer);
newBalance = (tradeBalance != null ? tradeBalance : 0) + purchase;
trades.put(customer, newBalance);
txManager.commit();
retryTransaction = false;
}
catch (CommitConflictException conflict) {
// entry value changed causing a conflict for this customer, so try again
retryTransaction = true;
} finally {
// All other exceptions will be handled by the caller.
// Any exception thrown by a method other than commit() needs
// to do a rollback to avoid leaking the transaction state.
if(txManager.exists()) {
txManager.rollback();
}
}
} while (retryTransaction);
Design transactions such that any get operations are within the transaction. This causes those entries to be part of the transactional state, which is desired such that intersecting transactions can be detected and signal commit conflicts.
A transaction may be embedded in a function. The application invokes the function, and the function contains the transaction that does the begin
, the region operations, and the commit
or rollback
.
This use of a function can have performance benefits. The performance benefit results from both the function and the region data residing on servers. As the function invokes region operations, those operations on region entries stay on the server, so there is no network round trip time to do get or put operations on region data.
This function example accomplishes atomic updates on a single region representing the quantity of products available in inventory. Doing this in a transaction prevents double allocating inventory for two orders placed simultaneously.
/**
* Atomically reduce inventory quantity
*/
public class TransactionalFunction extends Function {
/**
* Returns true if the function had the requested quantity of
* inventory and successfully completed the transaction to
* record the reduced inventory that fulfills the order.
*/
@Override
public void execute(FunctionContext context) {
RegionFunctionContext rfc = (RegionFunctionContext) context;
Region<ProductId, Integer> inventoryRegion = rfc.getDataSet();
CacheTransactionManager
txManager = context.getCache().getCacheTransactionManager();
// single argument will be a ProductId and a quantity
ProductRequest request = (ProductRequest) rfc.getArguments();
ProductId productRequested = request.getProductId();
Integer qtyRequested = request.getQuantity();
boolean success = false;
do {
boolean commitConflict = false;
try {
txManager.begin();
Integer qtyAvailable = inventoryRegion.get(productRequested);
if (qtyAvailable >= qtyRequested) {
// enough inventory is available, so process request
Integer remaining = qtyAvailable - qtyRequested;
inventoryRegion.put(productRequested, remaining);
txManager.commit();
success = true;
}
} catch (CommitConflictException conflict) {
// retry transaction, as another request on this same key succeeded,
// so this transaction attempt failed
commitConflict = true;
} finally {
// All other exceptions will be handled by the caller; however,
// any exception thrown by a method other than commit() needs
// to do a rollback to avoid leaking the transaction state.
if(txManager.exists()) {
txManager.rollback();
}
}
} while (commitConflict);
context.getResultSender().lastResult(success);
}
@Override
public String getId() {
return "TxFunction";
}
/**
* Returning true causes this function to execute on the server
* that holds the primary bucket for the given key. It can save a
* network hop from the secondary to the primary.
*/
@Override
public boolean optimizeForWrite() {
return true;
}
}
The application-side details on function implementation are not covered in this example. The application sets up the function context and the argument. See the section on Function Execution for details on functions.
The function implementation needs to catch the commit conflict exception such that it can retry the entire transaction. The exception only occurs if another request for the same product intersected with this one, and that other request’s transaction committed first.
The optimizeForWrite
method is defined to cause the system to execute the function on the server that holds the primary bucket for the given key. It can save a network hop from the secondary to the primary.
Note that the variable qtyAvailable
is a reference, because the Region.get
operation returns a reference within this server-side code. Read Region Operations Return References for details and how to work around the implications of a reference as a return value when working with server code.