com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links2' is unknown.

Handling Distributed Transactions in the Message Broker

The Message Broker profile in WSO2 Enterprise Integrator (WSO2 EI) supports XA transactions (distributed transactions). That is, the broker profile of WSO2 EI can take the place of a resource manager and participate in a distributed transaction along with other resource managers (database servers, SAP R/3 system, etc.) that have the same support for distributed transactions.

The below diagram illustrates how the broker profile of WSO2 EI and two RDBMSs are set up as resource managers for a distributed transaction. In this example, a client (ESB profile of WSO2 EI) is sending an XA transaction. Each of the messages included in the XA transaction should be delivered to the relevant resource manager. However, if at least one of the resource managers fail to receive the message successfully, the XA transaction should be canceled.

Diagram: XA transaction sent to multiple resource managers.

In addition to the Resource Managers, the Application Program (AP) and the Transaction Manager(TM) are two other components that participate in the process of executing a distributed transaction. The AP and the TM components invoke different portions of the XA publisher implementation: The XAResource (calling the XA API) is handled by the TM, whereas that JMS session is handled by the AP.

Note that the TM and AP components are already embedded and set up in the ESB profile of WSO2 EI. Therefore, when you use the ESB profile as your JMS client, you do not have to implement these sections (relevant to the TM and AP) in the ESB's XA publisher (for example the proxy service).

However, if you are not using WSO2 EI's ESB to publish XA transactions to the broker, refer the sample XA publisher implementation given below. See how the TM and AM components are used in this implementation.

 Sample XA Publisher Implementation
public class XAPublisherSample {
    public static void main(String[] args) throws Exception {
       InitialContext initialContext = getInitialContext();

        XAConnectionFactory connectionFactory = (XAConnectionFactory) initialContext
                .lookup("QueueConnectionfactory");
        Destination xaTestQueue = (Destination) initialContext.lookup("queueName");

        XAConnection xaConnection = connectionFactory.createXAConnection();
        xaConnection.start();
        XASession xaSession = xaConnection.createXASession();

        // Get XAResource and JMS session from the XASession. XAResource is given to 
        // the TM and JMS Session is given to the AP.
        XAResource xaResource = xaSession.getXAResource();
        Session session = xaSession.getSession();

        // AP
        MessageProducer producer = session.createProducer(xaTestQueue);
        // TM
        Xid xid = generateNewXid();
        xaResource.start(xid, XAResource.TMNOFLAGS);
        // AP
        producer.send(session.createTextMessage("Test 1"));
        // TM
        xaResource.end(xid, XAResource.TMSUCCESS);
        int ret = xaResource.prepare(xid);

        If (ret == XAResource.XA_OK) {
           xaResource.commit(xid, false);
        }

        // Closing the JMS Session
        session.close();
        
        // Closing the XASession 
        xaSession.close();
        
        xaConnection.close();
    }
}

Read more about the XA specification in Java Transaction API (JTA) Specification 1.1.

Now, let's look at how distributed transactions work with a single broker node as the resource manager.For example, let's have the ESB profile of WSO2 EI as the client sending the distributed transaction. The messages belonging to this transaction should be delivered to three separate queues/topics in the Broker node. If at least one message fails, all the messages are rolled back.

Diagram: XA transaction distributed among queues in the broker.

Using WSO2 EI's Broker profile for XA transactions

Now, let's look at the various configurations that affect XA transactions when you work with WSO2 EI's Message Broker profile.

See the following topics for details:

MVCC-enabled database for the MB store

The Message Broker profile of WSO2 EI requires a separate database for storing MB-specific data. When you use XA transactions, be sure that MVCC is enabled for the database you use.

See Changing the Default Broker Database for instructions.

Configuring the maximum number of XA transactions

You can configure the maximum number of distributed transactions that can be handled in parallel by the Message Broker profile of WSO2 EI. To do this, set the following parameter in the broker.xml file (stored in the <EI_HOME>/wso2/broker/repository/conf directory).

<transaction>
    <maxParallelDtxChannels>20</maxParallelDtxChannels>
</transaction>

Failover of XA transactions

As explained above, the behavior of XA transactions is straightforward when you use a single broker node. However, if you have a clustered setup, you need to correctly configure the failover method for you XA transactions.

Let's consider a scenario where the client (for example, the ESB profile) is sending messages to a cluster of broker nodes. In a typical multi-node cluster, when one node fails to connect with the client, the client should be able to connect to the next available node (i.e., failover to the next available node) and continue the transactions. This behavior of failing over to the next available node should be configured using the failover method for the connection.

However, XA transactions sent to the broker cluster should not be allowed to failover from one node to another before all the messages (belonging to that transacted connection) are successfully committed to one node. Therefore, in the event that the connection between the client (sending the XA transaction) and the broker node breaks before all the messages are successfully committed to that node, all the messages should be rolled back. That is, the XA transaction should be canceled. The client can then reconnect to the next available broker node in the cluster and resend all the XA transactions to that broker node. The onetime failover method supports this requirement for XA transactions.

Test distributed transactions with WSO2 EI's Message Broker

You can see how this works by following the steps given below.

  1. Follow the instructions in configuring the ESB profile with the Message Broker profile
  2. Start the Message Broker profile and create three queues: mbqueue1, mbqueue2, and mbqueue3
  3. Start the ESB profile and add a proxy service to mediate messages to the broker. To handle XA transactions, the proxy service should be configured as shown below. In this example, the ESB listens to a JMS queue named MyJMSQueue and consumes messages and sends messages to multiple JMS queues in a transactional manner.

     Sample Proxy Service
    <?xml version="1.0" encoding="UTF-8"?><proxy xmlns="http://ws.apache.org/ns/synapse"
           name="JMSListenerProxy"
           startOnLoad="true"
           statistics="disable"
           trace="disable"
           transports="jms,http,https">
       <target>
          <inSequence>
             <property name="OUT_ONLY" value="true"/>
             <log level="custom">
                <property expression="get-property('MessageID')" name="MESSAGE_ID_A"/>
             </log>
             <log level="custom">
                <property expression="$body" name="BEFORE"/>
             </log>
             <property expression="get-property('MessageID')"
                       name="MESSAGE_ID_B"
                       scope="operation"
                       type="STRING"/>
             <property description="FailureResultProperty"
                       name="failureResultProperty"
                       scope="default">
                <result xmlns="">failure</result>
             </property>
             <enrich>
                <source clone="true" xpath="$ctx:failureResultProperty"/>
                <target type="body"/>
             </enrich>
             <log level="custom">
                <property expression="$body" name="AFTER"/>
             </log>
             <property name="BEFORE1" scope="axis2" type="STRING" value="ABCD"/>
             <callout serviceURL="jms:/MBQueue1?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=conf/jndi.properties&amp;transport.jms.DestinationType=queue&amp;transport.jms.TransactionCommand=begin">
                <source type="envelope"/>
                <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                        xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                        xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
             </callout>
             <callout serviceURL="jms:/MBQueue2?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=conf/jndi.properties&amp;transport.jms.DestinationType=queue">
                <source type="envelope"/>
                <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                        xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                        xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
             </callout>
             <callout serviceURL="jms:/MBQueue3?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=conf/jndi.properties&amp;transport.jms.DestinationType=queue&amp;transport.jms.TransactionCommand=end">
                <source type="envelope"/>
                <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                        xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                        xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
             </callout>
             <drop/>
          </inSequence>
          <faultSequence>
             <log level="custom">
                <property name="Transaction Action" value="Rollbacked"/>
             </log>
             <callout serviceURL="jms:/MBQueueDLQ?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=conf/jndi.properties&amp;transport.jms.DestinationType=queue&amp;transport.jms.TransactionCommand=rollback">
                <source type="envelope"/>
                <target xmlns:s11="http://schemas.xmlsoap.org/soap/envelope/"
                        xmlns:s12="http://www.w3.org/2003/05/soap-envelope"
                        xpath="s11:Body/child::*[fn:position()=1] | s12:Body/child::*[fn:position()=1]"/>
             </callout>
          </faultSequence>
       </target>
       <parameter name="transport.jms.Destination">MyJMSQueue</parameter>
       <parameter name="transport.jms.ContentType">
          <rules xmlns="">
             <jmsProperty>contentType</jmsProperty>
             <default>application/xml</default>
          </rules>
       </parameter>
       <description/>
    </proxy>
  4. Now, you can disable one queue in the Broker profile and send a message to the ESB profile. The proxy service will attempt to dispatch the message to all three queues. However, since one queue is unavailable, the message will not be delivered to any of the queues.

com.atlassian.confluence.content.render.xhtml.migration.exceptions.UnknownMacroMigrationException: The macro 'next_previous_links' is unknown.