Distributed transaction solution

Data will not be lost for no reason, nor will it increase inexplicably

I. Overview

1. Once upon a time, when Zhizhi was working on a project in a small company, it was a service that dominated the world, so issues related to data consistency were directly processed with local transactions.

2. With the passage of time, the number of users increased, and it was found that a Java service could not support it, so the technical boss decided to upgrade the system. According to the business of the system, a single service is split, and then developers are also divided. A developer only develops and maintains problems in one or a few services. Everyone performs their duties and cooperates with each other.


3. Of course, service splitting is not accomplished overnight. This is a huge time-consuming and labor-intensive project. Most systems undergo multiple rounds of splitting, and then slowly form a stable system. Follow a core idea: first conduct a round of splitting according to the overall business, and then conduct a detailed splitting based on the split service modules.

4. After the split of the service, the number of users has resisted, but it is found that the data is accessed in different services, which leads to a new question: how to ensure the consistency of data across servers? Of course, in a cross-service distributed system, there are not only this problem, but also a series of other problems, such as: service availability, service fault tolerance, network problems of inter-service calls, etc. Only the data consistency problem is discussed here.

5. When it comes to data consistency, it can be roughly divided into three types: strong consistency, weak consistency, and eventual consistency.

  • Strong consistency: Once data is written, the latest value can be read at any time.
  • Weak consistency: When writing a piece of data, other places read the data, and the data that may be found may not be the latest
  • Final consistency: It is a variant of weak consistency. It does not pursue the data to be consistent at any time in the system, but after a certain period of time, the data must eventually reach consistency.

From these three consistent models, we can see that weak consistency and final consistency are generally asynchronously redundant, while strong consistency is synchronously redundant, and asynchronous processing brings better performance, but also requires compensation for processing data. Synchronization means simplicity, but it also inevitably reduces the performance of the system.

2. Theory

The data consistency problem mentioned above is actually the problem of distributed transactions. Now there are some solutions. I believe everyone has seen it more or less. Here I will take you to review it.

2.1. Two-phase submission

2PC is a strong consistency design scheme that coordinates the commit and rollback of each local transaction (also known as a transaction participant) by introducing a transaction coordinator.
2PC is mainly divided into 2 stages:

1. The first stage: The transaction coordinator will initiate a transaction start command to each transaction participant, and each transaction participant will perform the preparation operation, and then reply to the transaction coordinator whether the preparation is completed.
But local transactions will not be submitted, but resources need to be locked at this stage.

2. The second stage: After the transaction coordinator receives the reply from each transaction participant, it counts the replies of each participant. If each participant replies "can submit", then the transaction coordinator will send a commit command to participate The operator formally submits the local transaction, releases all resources, and ends the global transaction. But if one participant replies "refused to commit", then the transaction coordinator sends a rollback command, and all participants roll back the local transaction. After all the rollback is completed, resources are released and the global transaction is canceled.

Transaction submission process

Transaction rollback process


Of course, the problems existing in 2PC are also mentioned here. One is synchronous blocking, which consumes performance. The other is the problem of coordinator failure. Once the coordinator fails, all participants will be blocked to deal with resource lock status.

2.2, three-phase commit

3PC is mainly improved on the basis of 2PC, mainly to solve the blocking problem of 2PC. It mainly divides the first phase of 2PC into two steps, prepare first, and then lock resources, and introduce a timeout mechanism (which also means that data inconsistency will be caused).

The three phases of 3PC include: CanCommit, PreCommit and DoCommit

I won’t go into details, but just a core point: resources are not locked during CanCommit, unless all participants agree to start locking resources.

2.3, TCC flexible transaction

Compared with the previous 2PC and 3PC, the essential difference between TCC and the two brothers is that it is a distributed transaction at the business level, while 2PC and 3PC are at the database level. TCC is an abbreviation of three words: Try, Confirm, and Cancel, which are also divided into these three processes.

Try: try, that is, try to reserve resources and lock resources

Confirm: confirm, that is, execute the reserved resources, and retry if the execution fails

Cancel: cancel, revoke the reserved resources, if the execution fails, it will try again

As can be seen from the above figure, TCC has a great intrusion into the business and is tightly coupled together. Compared with 2PC and 3PC, TCC has a wider trial range, and can realize distributed transactions across databases and different systems. The disadvantage is that a large amount of logic needs to be developed in the business code to implement these three steps, and it needs to be coupled with the code, which increases the development cost.

Transaction log: In TCC mode, both transaction initiators and transaction participants will record transaction logs (transaction status, information, etc.). This transaction log is the key to commit and rollback in case of unexpected situations (downtime, restart, network interruption, etc.) of the entire distributed transaction.

Idempotency: In the second stage of TCC, when confirm ing or cancel ing, both operations need to ensure idempotency. Once the execution fails due to reasons such as the network, continuous retries will be initiated.

Anti-hanging: Due to the unreliability of the network, when there is an abnormal situation, the try request may arrive later than the cancel request. cancel may perform an empty rollback, but no resources will be reserved when the try request is executed.

2.4,Seata

I won’t mention much about seata here. The most used one is the AT mode. I learned it last time and analyzed it step by step. After configuration, you only need to add the @GlobalTransactional annotation to the method of transaction initiation to start the global transaction, which has no intrusion into the business. , low coupling. If you are interested, please refer to the previous discussion of Seata.

3. Application scenarios

Knowing that I have encountered such a business scenario in a company before; the user applies for insurance through the page and submits an order. The order passes through the upstream service to process the business logic related to the policy, and finally flows into the downstream service to process performance, personnel promotion, and distribution. profit processing and so on. For this scenario, the business logic processed by both sides is not in the same service, but accesses different databases. When it comes to data consistency issues, distributed transactions need to be used.

For the several schemes introduced above, only the theory and ideas are discussed. Let me summarize an implementation scheme used in this business scenario. The scheme of local message table + MQ asynchronous message is adopted to realize the final consistency of the transaction, which is also in line with the business scenario at that time. It is relatively strong consistency and achieves high performance. The following is the idea diagram of the plan

  1. There may be multiple states of real business processing, so it is necessary to clarify which state requires timing task compensation
  2. If a certain document cannot be processed all the time, the scheduled task cannot be issued without limit, so the local message table needs to add the concept of rounds, and the alarm will be issued after retrying a certain number of times, and manual intervention is required.
  3. Due to the existence of MQ and scheduled tasks, repeated requests will inevitably occur, so the downstream must do a good job in idempotent anti-duplication, otherwise there will be duplicate data, resulting in data inconsistency

For the landing implementation, don't say much, just go to the code. First define two tables tb_order and tb_notice_message to store order information and local transaction information respectively

CREATE TABLE `tb_order` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key id',
  `user_id` int(11) NOT NULL COMMENT 'order person id',
  `order_no` varchar(255) CHARACTER SET latin1 NOT NULL COMMENT 'order number',
  `insurance_amount` decimal(16,2) NOT NULL COMMENT 'Insurance amount',
  `order_amount` decimal(16,2) DEFAULT NULL COMMENT 'Premium',
  `create_time` datetime DEFAULT NULL COMMENT 'creation time',
  `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
  `is_delete` tinyint(4) DEFAULT '0' COMMENT 'Delete ID: 0-Do not delete; 1-delete',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;
CREATE TABLE `tb_notice_message` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'primary key id',
  `type` tinyint(4) NOT NULL COMMENT 'Business Type: 1-place an order',
  `status` tinyint(4) NOT NULL DEFAULT '1' COMMENT 'status: 1-pending, 2-processed, 3-early warning',
  `data` varchar(255) NOT NULL COMMENT 'information',
  `retry_count` tinyint(4) DEFAULT '0' COMMENT 'number of retries',
  `create_time` datetime NOT NULL COMMENT 'creation time',
  `update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
  `is_delete` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'Delete ID: 0-Do not delete; 1-delete',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=utf8mb4;

To process the order service, we can use the decorator pattern we mentioned before to decorate the service. Save local transactions and send mq messages to the decorator class, and service only needs to care about business logic, which also conforms to the principle of opening and closing.

/**
 * @author past events
 * @version 1.0
 * @date 2022/12/13 10:58
 * @description
 */
@Service
@Slf4j
@AllArgsConstructor
public class OrderService implements BaseHandler<Object, Order> {

    private final OrderMapper orderMapper;

    /**
     * Order processing method: only process order association logic
     * @param o
     * @return
     */
    @Override
    public Order handle(Object o) {
        // order information
        Order order = Order.builder()
                .orderNo("2345678")
                .createTime(LocalDateTime.now())
                .userId(1)
                .insuranceAmount(new BigDecimal(2000000))
                .orderAmount(new BigDecimal(5000))
                .build();
        orderMapper.insert(order);
        return order;
    }
}

The new OrderService decoration class OrderServiceDecorate is responsible for the expansion of the order logic. Here is to add local transaction messages and send MQ information. The extension method adds Transactional annotations to ensure that the order logic and local transaction message data are carried out in the same transaction. Ensure atomicity. Among them, the transaction message marking process is completed after the downstream service has processed the business logic, and then the update process is completed.

/**
 * @author past events
 * @version 1.0
 * @date 2022/12/14 18:48
 * @description
 */
@Slf4j
@AllArgsConstructor
@Decorate(scene = SceneConstants.ORDER, type = DecorateConstants.CREATE_ORDER)
public class OrderServiceDecorate extends AbstractHandler {

    private final NoticeMessageMapper noticeMessageMapper;

    private final RabbitTemplate rabbitTemplate;

    /**
     * Decoration method: extend the order processing logic
     * @param o
     * @return
     */
    @Override
    @Transactional
    public Object handle(Object o) {
        // Call the service method to implement policy logic
        Order order = (Order) service.handle(o);
        // Extension: 1. Save transaction messages, 2. Send MQ messages
        // local transaction message
        String data = "{\"orderNo\":\"2345678\", \"userId\":1, \"insuranceAmount\":2000000, \"orderAmount\":5000}";
        NoticeMessage noticeMessage = NoticeMessage.builder()
                .retryCount(0)
                .data(data)
                .status(1)
                .type(1)
                .createTime(LocalDateTime.now())
                .build();
        noticeMessageMapper.insert(noticeMessage);
        // send mq message
        log.info("send mq information....");
        rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
        return null;
    }
}

About this decorator mode, I have talked about it before, you can read the content published before.

The downstream service listens to the message, and after processing its own business logic (such as: performance, profit sharing, promotion, etc.), it needs to send MQ, the upstream service listens to the message, and updates the local transaction status as processed. It should be noted that downstream services need to do idempotent processing to prevent retrying of upstream service data under abnormal circumstances.

/**
 * @author past events
 * @version 1.0
 * @date 2022/12/13 18:07
 * @description
 */
@Component
@Slf4j
@RabbitListener(queues = "trans.queue")
public class FenRunListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    public void orderHandler(String msg) {
        log.info("Listen to the order message:{}", msg);
        // Need to pay attention to idempotent, idempotent logic
        log.info("Downstream service business logic. . . . .");
        JSONObject json = JSONUtil.parseObj(msg);
        rabbitTemplate.convertAndSend("trans", "trans.update.order.queue.key", json.getInt("id"));
    }
}

Insert a digression here. Regarding the processing of idempotence, I have roughly two ideas here.
1. For example, check whether the record exists according to the order number. If it exists, it will directly return success.
2. redis stores a unique request number and deletes it after processing. If there is no request number, it will return success directly. You can write an AOP to process it and isolate it from the business.
Closer to home, the upstream service message is monitored, the downstream sends the MQ message, the local transaction message is updated as processed, and the distributed transaction process ends.

/**
 * @author past events
 * @version 1.0
 * @date 2022/12/13 18:29
 * @description
 */
@Component
@Slf4j
@RabbitListener(queues = "trans.update.order.queue")
public class OrderListener {

    @Autowired
    private NoticeMessageMapper noticeMessageMapper;

    @RabbitHandler
    public void updateOrder(Integer msgId) {
        log.info("Listen to messages, update local transaction messages, message id:{}", msgId);
        NoticeMessage msg = NoticeMessage.builder().status(2).id(msgId).updateTime(LocalDateTime.now()).build();
        noticeMessageMapper.updateById(msg);
    }
}

When there is an abnormal situation, it will send a message to MQ in a polling manner through a scheduled task, and try its best to make the downstream service achieve data consistency. Of course, an upper limit should be set for retrying; if it still fails after reaching the upper limit, then you have to It is considered that there is a problem with the downstream service itself (there may be a problem with the code logic).

/**
 * @author past events
 * @version 1.0
 * @date 2022/12/14 10:25
 * @description
 */
@Configuration
@EnableScheduling
@AllArgsConstructor
@Slf4j
public class RetryOrderJob {

    private final RabbitTemplate rabbitTemplate;

    private final NoticeMessageMapper noticeMessageMapper;

    /**
     * Maximum number of automatic retries
     */
    private final Integer MAX_RETRY_COUNT = 5;

    @Scheduled(cron = "0/20 * * * * ? ")
    public void retry() {
        log.info("Timing tasks, retrying abnormal orders");
        LambdaQueryWrapper<NoticeMessage> wrapper = Wrappers.lambdaQuery(NoticeMessage.class);
        wrapper.eq(NoticeMessage::getStatus, 1);
        List<NoticeMessage> noticeMessages = noticeMessageMapper.selectList(wrapper);
        for (NoticeMessage noticeMessage : noticeMessages) {
            // resend mq message
            rabbitTemplate.convertAndSend("trans", "trans.queue.key", JSONUtil.toJsonStr(noticeMessage));
            // Retries +1
            noticeMessage.setRetryCount(noticeMessage.getRetryCount() + 1);
            noticeMessageMapper.updateById(noticeMessage);
            // Determine the number of retries, which is equal to the maximum limit number, and directly update to the alarm state
            if (MAX_RETRY_COUNT.equals(noticeMessage.getRetryCount())) {
                noticeMessage.setStatus(3);
                noticeMessageMapper.updateById(noticeMessage);
                // Send an alarm and notify the corresponding personnel
                // Alarm logic (SMS, email, Qiwei Group, etc.)....
            }
        }
    }
}

In fact, there is a problem here, when one upstream service corresponds to multiple downstream services. At this time, it is often impossible to save a local message record.

  1. Here, an additional field next_server_count can be added to the message table to indicate the number of downstream services that an order initiator needs to call. When the upstream service is listening, it will subtract 1 from each downstream callback until the value is 0, and then update the status to be processed. But to control concurrency, this field is shared by multiple downstream services.
  2. Another solution is to record a transaction message for each downstream service, and use the type field to distinguish and mark the type. Implement a one-to-one relationship between upstream and downstream for transaction messages.
  3. Finally, after the maximum number of retries is reached, the message can be added to an alarm list. This alarm list can be displayed in the management background or other monitoring systems to display some necessary information for the company's internal personnel to manually intervene to deal with this. Abnormal data makes the data reach final consistency.

Four. Summary

In fact, there is no perfect solution for distributed transactions. It can only be said to try to meet business needs and data consistency. If the program can't handle it, it's up to humans to make a compensation plan for the data.

5. Reference source code

Programming documentation:
https://gitee.com/cicadasmile/butte-java-note

Application repository:
https://gitee.com/cicadasmile/butte-flyer-parent

Tags: Java Transaction

Posted by Rex__ on Wed, 15 Feb 2023 13:38:35 +1030