SpringBoot distributed transaction solution (JTA+Atomic + multiple data sources)

First of all, what is A distributed transaction? For example, when we execute A business logic, we have two steps to operate A data source and B data source respectively. When we execute data changes in A data source and run-time exceptions occur in B data source, we must roll back the operation of B data source and roll back the operation of A data source; This situation often occurs in payment business; For example, if the ticket purchase business fails to pay in the end, the previous operations must be rolled back. If the previous operations are distributed among multiple data sources, this is A typical distributed transaction rollback;

After knowing what is distributed transaction, the solution of distributed transaction in java is JTA (Java Transaction API); springboot officially provided the solution of Atomikos or Bitronix;

In fact, in most cases, many companies use message queuing to implement distributed transactions.

This article focuses on integrating Atomikos +mysql+mybatis+tomcat/jetty under the springboot environment;

1, Project dependency

pom. Add springboot dependency of atomikos in XML:

<!--Distributed transaction-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

Click in and you will find that it is integrated: transactions JMS, transactions JTA, transactions JDBC and javax transaction-api

2, Extract the relevant configuration items of the data source into an application In YML:

be careful:

  1. This time our spring datasource. Type # is com alibaba. druid. pool. xa. DruidXADataSource;

  2. spring. jta. The value of transaction manager ID is unique in your computer. Please read the official document for details;

The complete yml file is as follows:

spring:
  datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    druid:
    
      systemDB:
        name: systemDB
        url: jdbc:mysql://localhost:3306/springboot-mybatis?useUnicode=true&characterEncoding=utf-8
        username: root
        password: root
        #The following is the supplementary settings of connection pool, which are applied to all the above data sources
        #Initialization size, min, Max
        initialSize: 5
        minIdle: 5
        maxActive: 20
        #Configure the timeout time for getting connections
        maxWait: 60000
        #How often is the configuration interval detected? Idle connections that need to be closed are detected in milliseconds
        timeBetweenEvictionRunsMillis: 60000
        #Configure the minimum lifetime of a connection in the pool, in milliseconds
        minEvictableIdleTimeMillis: 30
        validationQuery: SELECT 1
        validationQueryTimeout: 10000
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        #Open PSCache and specify the size of PSCache on each connection
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        filters: stat,wall
        #Open the mergeSql function through the connectProperties property; Slow SQL record
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        #Merge monitoring data of multiple DruidDataSource
        useGlobalDataSourceStat: true
 
      businessDB:
        name: businessDB
 
        url: jdbc:mysql://localhost:3306/springboot-mybatis2?useUnicode=true&characterEncoding=utf-8
        username: root
        password: root
        #The following is the supplementary settings of connection pool, which are applied to all the above data sources
        #Initialization size, min, Max
        initialSize: 5
        minIdle: 5
        maxActive: 20
        #Configure the timeout time for getting connections
        maxWait: 60000
        #How often is the configuration interval detected? Idle connections that need to be closed are detected in milliseconds
        timeBetweenEvictionRunsMillis: 60000
        #Configure the minimum lifetime of a connection in the pool, in milliseconds
        minEvictableIdleTimeMillis: 30
        validationQuery: SELECT 1
        validationQueryTimeout: 10000
        testWhileIdle: true
        testOnBorrow: false
        testOnReturn: false
        #Open PSCache and specify the size of PSCache on each connection
        poolPreparedStatements: true
        maxPoolPreparedStatementPerConnectionSize: 20
        filters: stat,wall
        #Open the mergeSql function through the connectProperties property; Slow SQL record
        connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
        #Merge monitoring data of multiple DruidDataSource
        useGlobalDataSourceStat: true
 
  #jta related parameter configuration
  jta:
    log-dir: classpath:tx-logs
    transaction-manager-id: txManager

3, In druidconfig Realize the registration of multiple data sources in Java; Registration of distributed transaction manager; druid registration;

package com.zjt.config;
 
import com.alibaba.druid.filter.stat.StatFilter;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
import com.alibaba.druid.wall.WallConfig;
import com.alibaba.druid.wall.WallFilter;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.env.Environment;
import org.springframework.transaction.jta.JtaTransactionManager;
 
import javax.sql.DataSource;
import javax.transaction.UserTransaction;
import java.util.Properties;
 
/**
 * Druid to configure
 *
 * 
 */
@Configuration
public class DruidConfig {
    @Bean(name = "systemDataSource")
    @Primary
    @Autowired
    public DataSource systemDataSource(Environment env) {
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        Properties prop = build(env, "spring.datasource.druid.systemDB.");
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName("systemDB");
        ds.setPoolSize(5);
        ds.setXaProperties(prop);
        return ds;
 
    }
 
    @Autowired
    @Bean(name = "businessDataSource")
    public AtomikosDataSourceBean businessDataSource(Environment env) {
 
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
        Properties prop = build(env, "spring.datasource.druid.businessDB.");
        ds.setXaDataSourceClassName("com.alibaba.druid.pool.xa.DruidXADataSource");
        ds.setUniqueResourceName("businessDB");
        ds.setPoolSize(5);
        ds.setXaProperties(prop);
 
        return ds;
    }
 
 
    /**
     * Injection transaction manager
     * @return
     */
    @Bean(name = "xatx")
    public JtaTransactionManager regTransactionManager () {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        UserTransaction userTransaction = new UserTransactionImp();
        return new JtaTransactionManager(userTransaction, userTransactionManager);
    }
 
 
    private Properties build(Environment env, String prefix) {
 
        Properties prop = new Properties();
        prop.put("url", env.getProperty(prefix + "url"));
        prop.put("username", env.getProperty(prefix + "username"));
        prop.put("password", env.getProperty(prefix + "password"));
        prop.put("driverClassName", env.getProperty(prefix + "driverClassName", ""));
        prop.put("initialSize", env.getProperty(prefix + "initialSize", Integer.class));
        prop.put("maxActive", env.getProperty(prefix + "maxActive", Integer.class));
        prop.put("minIdle", env.getProperty(prefix + "minIdle", Integer.class));
        prop.put("maxWait", env.getProperty(prefix + "maxWait", Integer.class));
        prop.put("poolPreparedStatements", env.getProperty(prefix + "poolPreparedStatements", Boolean.class));
 
        prop.put("maxPoolPreparedStatementPerConnectionSize",
                env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
 
        prop.put("maxPoolPreparedStatementPerConnectionSize",
                env.getProperty(prefix + "maxPoolPreparedStatementPerConnectionSize", Integer.class));
        prop.put("validationQuery", env.getProperty(prefix + "validationQuery"));
        prop.put("validationQueryTimeout", env.getProperty(prefix + "validationQueryTimeout", Integer.class));
        prop.put("testOnBorrow", env.getProperty(prefix + "testOnBorrow", Boolean.class));
        prop.put("testOnReturn", env.getProperty(prefix + "testOnReturn", Boolean.class));
        prop.put("testWhileIdle", env.getProperty(prefix + "testWhileIdle", Boolean.class));
        prop.put("timeBetweenEvictionRunsMillis",
                env.getProperty(prefix + "timeBetweenEvictionRunsMillis", Integer.class));
        prop.put("minEvictableIdleTimeMillis", env.getProperty(prefix + "minEvictableIdleTimeMillis", Integer.class));
        prop.put("filters", env.getProperty(prefix + "filters"));
 
        return prop;
    }
 
    @Bean
    public ServletRegistrationBean druidServlet() {
        ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(new StatViewServlet(), "/druid/*");
 
        //Console management user, add the following 2 lines, and you need to log in when you enter druid background
        //servletRegistrationBean.addInitParameter("loginUsername", "admin");
        //servletRegistrationBean.addInitParameter("loginPassword", "admin");
        return servletRegistrationBean;
    }
 
    @Bean
    public FilterRegistrationBean filterRegistrationBean() {
        FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
        filterRegistrationBean.setFilter(new WebStatFilter());
        filterRegistrationBean.addUrlPatterns("/*");
        filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
        filterRegistrationBean.addInitParameter("profileEnable", "true");
        return filterRegistrationBean;
    }
 
    @Bean
    public StatFilter statFilter(){
        StatFilter statFilter = new StatFilter();
        statFilter.setLogSlowSql(true); //slowSqlMillis is used to configure the standard of slow SQL. If the execution time exceeds slowSqlMillis, it is slow.
        statFilter.setMergeSql(true); //SQL merge configuration
        statFilter.setSlowSqlMillis(1000);//The default value of slowSqlMillis is 3000, which is 3 seconds.
        return statFilter;
    }
 
    @Bean
    public WallFilter wallFilter(){
        WallFilter wallFilter = new WallFilter();
        //Allow multiple SQL executions
        WallConfig config = new WallConfig();
        config.setMultiStatementAllow(true);
        wallFilter.setConfig(config);
        return wallFilter;
    }
 
}

4, Configure the sqlSessionFactory corresponding to each data source and the package scanned by MapperScan respectively:

MybatisDatasourceConfig.java

package com.zjt.config;
 
import com.zjt.util.MyMapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
 
import javax.sql.DataSource;
 
/**
 * 
 * @description
 */
@Configuration
//Accurate to the # mapper # directory to isolate from other data sources
@MapperScan(basePackages = "com.zjt.mapper", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory")
public class MybatisDatasourceConfig {
 
    @Autowired
    @Qualifier("systemDataSource")
    private DataSource ds;
 
    @Bean
    public SqlSessionFactory sqlSessionFactory() throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(ds);
        //Specify mapper xml directory
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        factoryBean.setMapperLocations(resolver.getResources("classpath:mapper/*.xml"));
        return factoryBean.getObject();
 
    }
 
    @Bean
    public SqlSessionTemplate sqlSessionTemplate() throws Exception {
        SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory()); // Use the Factory configured above
        return template;
    }
 
    //As for the transaction manager, both JPA and JDBC implement the self interface platform transaction manager
    // If you add spring boot starter JDBC dependency, the framework will inject DataSourceTransactionManager instance by default.
    //In the Spring container, our manual annotation @ Bean will be loaded first, and the framework will not re instantiate other PlatformTransactionManager implementation classes.
    /*@Bean(name = "transactionManager")
    @Primary
    public DataSourceTransactionManager masterTransactionManager() {
        //MyBatis Automatically participate in spring transaction management without additional configuration, as long as org mybatis. spring. Data source referenced by sqlsessionfactorybean
        // It can be consistent with the data source referenced by DataSourceTransactionManager, otherwise transaction management will not work.
        return new DataSourceTransactionManager(ds);
    }*/
 
}

MybatisDatasource2Config.java

package com.zjt.config;
 
import com.zjt.util.MyMapper;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
 
import javax.sql.DataSource;
 
/**
 * 
 * @description
 */
@Configuration
//Accurate to the # mapper # directory to isolate from other data sources
@MapperScan(basePackages = "com.zjt.mapper2", markerInterface = MyMapper.class, sqlSessionFactoryRef = "sqlSessionFactory2")
public class MybatisDatasource2Config {
 
    @Autowired
    @Qualifier("businessDataSource")
    private DataSource ds;
 
    @Bean
    public SqlSessionFactory sqlSessionFactory2() throws Exception {
        SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
        factoryBean.setDataSource(ds);
        //Specify mapper xml directory
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        factoryBean.setMapperLocations(resolver.getResources("classpath:mapper2/*.xml"));
        return factoryBean.getObject();
 
    }
 
    @Bean
    public SqlSessionTemplate sqlSessionTemplate2() throws Exception {
        SqlSessionTemplate template = new SqlSessionTemplate(sqlSessionFactory2()); // Use the Factory configured above
        return template;
    }
 
    //As for the transaction manager, both JPA and JDBC implement the self interface platform transaction manager
    // If you add spring boot starter JDBC dependency, the framework will inject DataSourceTransactionManager instance by default.
    //In the Spring container, our manual annotation @ Bean will be loaded first, and the framework will not re instantiate other PlatformTransactionManager implementation classes.
    /*@Bean(name = "transactionManager2")
    @Primary
    public DataSourceTransactionManager masterTransactionManager() {
        //MyBatis Automatically participate in spring transaction management without additional configuration, as long as org mybatis. spring. Data source referenced by sqlsessionfactorybean
        // It can be consistent with the data source referenced by DataSourceTransactionManager, otherwise transaction management will not work.
        return new DataSourceTransactionManager(ds);
    }*/
 
}

Since we only use one transaction manager in this example: xatx, we no longer use txadviceinterceptor Java and txadvice2interceptor The transaction manager configured in Java; Children's shoes in need can configure other transaction managers by themselves; (see DruidConfig.java)

5, Create a new distributed business test interface jtatestservice Java and implementation class jtatestserviceimpl java

In fact, base note is a very simple test01() method. In this method, we call classService. successively. saveOrUpdateTClass(tClass); And teacherservice saveOrUpdateTeacher(teacher);

Realize the operation of two data sources successively: then we can debug ourselves to track the submission time of the transaction. In addition, we can also manually create a runtime exception after the full execution of the two methods to check whether all distributed transactions are rolled back;

be careful:

In the method of implementing the class, I use:

@Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })

Thus, it specifies which transaction manager to use, the transaction isolation level (usually my default), and the rollback conditions (usually exception can be used). These three can be modified according to the actual business;

package com.zjt.service3;
 
import java.util.Map;
 
public interface JtaTestService {
 
    public Map<String,Object> test01();
 
}
package com.zjt.service3.impl;
 
 
import com.zjt.entity.TClass;
import com.zjt.entity.Teacher;
import com.zjt.service.TClassService;
import com.zjt.service2.TeacherService;
import com.zjt.service3.JtaTestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
 
import java.util.LinkedHashMap;
import java.util.Map;
 
@Service("jtaTestServiceImpl")
public class JtaTestServiceImpl implements JtaTestService{
 
    @Autowired
    @Qualifier("teacherServiceImpl")
    private TeacherService teacherService;
    @Autowired
    @Qualifier("tclassServiceImpl")
    private TClassService tclassService;
 
    @Override
    @Transactional(transactionManager = "xatx", propagation = Propagation.REQUIRED, rollbackFor = { java.lang.RuntimeException.class })
    public Map<String, Object> test01() {
        LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>();
        TClass tClass=new TClass();
        tClass.setName("8888");
        tclassService.saveOrUpdateTClass(tClass);
 
        Teacher teacher=new Teacher();
        teacher.setName("8888");
        teacherService.saveOrUpdateTeacher(teacher);
 
        System.out.println(1/0);
 
        resultMap.put("state","success");
        resultMap.put("message","Distributed transaction synchronization succeeded");
        return resultMap;
    }
}

6, Create jtatecontoller Java, accept an http request from the front end and trigger the test01 method of JtaTestService:

package com.zjt.web;
 
 
import com.zjt.service3.JtaTestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
 
import java.util.LinkedHashMap;
import java.util.Map;
 
@Controller
@RequestMapping("/jtaTest")
public class JtaTestContoller {
 
    @Autowired
    @Qualifier("jtaTestServiceImpl")
    private JtaTestService taTestService;
 
 
 
    @ResponseBody
    @RequestMapping("/test01")
    public Map<String,Object> test01(){
        LinkedHashMap<String,Object> resultMap=new LinkedHashMap<String,Object>();
        try {
            return taTestService.test01();
        }catch (Exception e){
            resultMap.put("state","fail");
            resultMap.put("message","Distributed transaction synchronization failed");
            return resultMap;
        }
    }
}

7, In test Add a button in FTL to test;

//Distributed transaction test
$("#JTATest").click(function(){
    $.ajax({
        type: "POST",
        url: "${basePath!}/jtaTest/test01",
        data: {}    ,
        async: false,
        error: function (request) {
            layer.alert("Connection to the server failed/(ㄒoㄒ)/~~");
            return false;
        },
        success: function (data) {
            if (data.state == 'fail') {
                layer.alert(data.message);
                return false;
            }else if(data.state == 'success'){
                layer.alert(data.message);
            }
        }
    });
});
 
<button class="layui-btn" id="JTATest">Insert the class and teacher named 8888 into the class and teacher table at the same time</button>

8, Start the service and verify the result:

Click this button to jump to controller:

After the sql statement is executed normally, we can find that the database has not changed because the transaction of the whole method has not been completed. When we reach the step of 1 / 0:

Throw a runtime exception, which is intercepted by the spring transaction interceptor, and catch the exception:

In this completeTransactionAfterThrowing(txInfo, var16); Method will roll back all transactions:

22:09:04.243 logback [http-nio-8080-exec-5] INFO c.a.i.imp.CompositeTransactionImp - rollback() done of transaction 192.168.1.103.tm0000400006

At this time, when we open the database verification again, there is still no change, which proves that the distributed transaction configuration is successful;

You can practice by yourself based on my code and try the flexible configuration in the case of using multi transaction manager;

9, Postscript:

Source code of this article:

https://github.com/zhaojiatao/springboot-zjt-chapter10-springboot-atomikos-mysql-mybatis-druid.git

The code can complete transaction rollback in both tomcat and jetty environments;

A Transactional not active warning may be reported when a transaction is rolled back. After I google, foreigners can't say the specific role. Most people think it's just a warning and can be ignored;  

Tags: Spring IDE

Posted by pugs1501 on Fri, 15 Apr 2022 18:08:41 +0930