Explain the principle and code of RocketMQ transaction message

1, Origin of transaction messages

1. Case

Quoting official shopping cases:

When Xiaoming buys something of 100 yuan, he needs to add 100 points to Xiaoming's account in the downstream point system while deducting 100 yuan from his account. The account system and the points system are two independent systems. One needs to reduce 100 yuan and the other needs to increase 100 points. As shown below:

2. Question

  • The account service deduction was successful and the notification point system was successful, but the point increase failed and the data was inconsistent.
  • The account service deduction succeeded, but the notification of the point system failed, so the points will not be increased and the data are inconsistent.

3. Scheme

The solution of RocketMQ to the first problem is: if the consumption fails, it will be retried automatically. If the consumption fails after several retries, this situation needs to be solved manually, such as putting it in the dead letter queue and then manually checking the reason for processing.

RocketMQ's solution to the second problem is: if you deduct successfully but fail to write a message to mq, RocketMQ will roll back the message. At this time, we can also roll back our deduction.

2, Principle of transaction message

1. Principle diagram

2. Detailed process

  • 1. The producer sends a Half Message to the broker.

I really want to make complaints about why it is called half message, but it is difficult to understand. In fact, this is prepare message, pre sending messages.

  • After the Half Message is sent successfully, the local transaction will be executed.
  • If the local transaction is executed successfully, it returns commit; if the execution fails, it returns rollback. (the developer decides to commit or rollback in the callback method of transaction message)
  • The Producer sends the commit or rollback of the previous step to the broker. There are two situations:

1. If the broker receives a commit/rollback message:

  • The whole transaction is considered to be successful if it is received. Then a message will be sent to the Consumer.
  • If a rollback is received, the broker thinks that the local transaction has failed, and the broker will delete the Half Message and not send it to the Consumer.

2. If the broker does not receive the message (if the execution of the local transaction suddenly goes down, which means that the execution result of the local transaction returns unknow n, it will be handled as if the broker did not receive the confirmation message.):

  • The broker will periodically check the execution result of the local transaction: if the check result is that the local transaction has been executed, it will return commit; if not, it will return rollback.
  • The query result sent back to the broker. If the message received by the broker is a commit, the whole transaction will be deemed to have been executed successfully. If it is a rollback, the broker will be deemed to have failed to execute the local transaction. The broker will delete the Half Message and will not send it to the consumer. If the broker does not receive the result of the backcheck (or the result of the backcheck is unknow n), the broker will periodically conduct repeated backchecks to ensure that the final transaction result is found. The time interval and times of repeated check can be matched.

3, Transaction message implementation process

1. Implementation process


To put it simply, the transaction message is a listener with a callback function. In the callback function, we perform business logic operations, such as giving the account - 100 yuan, and then sending a message to the integral mq. At this time, if the account - 100 succeeds and the message is sent to mq successfully, set the message status to commit, and the broker will send this semi message to the real topic. At the beginning, the message is stored in the semi message queue and does not exist in the queue of the real topic. The transfer will be made only after the commit is confirmed.

2. Remedial plan

If the transaction fails to respond immediately due to interruption or other network reasons, RocketMQ will handle it as unknown. RocketMQ transaction messages also provide a remedy: regularly query the transaction status of transaction messages. This is also a callback function, which can be used for compensation. The compensation logic developer can write it himself. If it is successful, he will return to commit and finish it.

4, Code example

1. Code

package com.chentongwei.mq.rocketmq;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Date;

/**
 * Description:
 *
 * @author TongWei.Chen 2020-06-21 11:32:58
 */
public class ProducerTransaction2 {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("my-transaction-producer");
        producer.setNamesrvAddr("124.57.180.156:9876");

        // Callback
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
                LocalTransactionState state = null;
                //msg-4 return COMMIT_MESSAGE
                if(message.getKeys().equals("msg-1")){
                    state = LocalTransactionState.COMMIT_MESSAGE;
                }
                //msg-5 return ROLLBACK_MESSAGE
                else if(message.getKeys().equals("msg-2")){
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                }else{
                    //The purpose of returning unknown here is to simulate the sudden shutdown of local transaction execution (or the scenario of successful local execution and failure to send confirmation message)
                    state = LocalTransactionState.UNKNOW;
                }
                System.out.println(message.getKeys() + ",state:" + state);
                return state;
            }

            /**
             * Back checking method of transaction message
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                if (null != messageExt.getKeys()) {
                    switch (messageExt.getKeys()) {
                        case "msg-3":
                            System.out.println("msg-3 unknow");
                            return LocalTransactionState.UNKNOW;
                        case "msg-4":
                            System.out.println("msg-4 COMMIT_MESSAGE");
                            return LocalTransactionState.COMMIT_MESSAGE;
                        case "msg-5":
                            //The query found that the execution of the local transaction failed, and the message needs to be rolled back.
                            System.out.println("msg-5 ROLLBACK_MESSAGE");
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        //Simulate sending 5 messages
        for (int i = 1; i < 6; i++) {
            try {
                Message msg = new Message("transactionTopic", null, "msg-" + i, ("Test, this is the transaction message! " + i).getBytes());
                producer.sendMessageInTransaction(msg, null);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

2. Results

msg-1,state:COMMIT_MESSAGE
msg-2,state:ROLLBACK_MESSAGE
msg-3,state:UNKNOW
msg-4,state:UNKNOW
msg-5,state:UNKNOW

msg-3 unknow
msg-3 unknow
msg-5 ROLLBACK_MESSAGE
msg-4 COMMIT_MESSAGE

msg-3 unknow
msg-3 unknow
msg-3 unknow
msg-3 unknow

3. Control desk

4. Result analysis

  • Only msg-1 and msg-4 were sent successfully. Msg-4 is ahead of msg-1 because msg-1 succeeds first and msg-4 succeeds only after checking back. In reverse chronological order.
  • First, output five results corresponding to five messages

msg-1,state:COMMIT_MESSAGE
msg-2,state:ROLLBACK_MESSAGE
msg-3,state:UNKNOW
msg-4,state:UNKNOW
msg-5,state:UNKNOW

  • Then it enters the backcheck. msg-3 is still unknow n, msg-5 rolls back, and msg-4 commits the transaction. So at this time, msg-4 can be seen in the control console.
  • After a while, I checked msg-3 again and found that it was still unknow n, so I kept checking it.

The time interval and times of back checking are configurable. The default is to check back 15 times. If it fails, the message will be lost.

5, Question

Question: can Spring transactions and conventional distributed transactions not work? Is Rocketmq's transaction unnecessary?

MQ is used for decoupling. Previously, distributed transactions directly operated the account system and integral system. However, there is a strong coupling between them. If an MQ is inserted in the middle, the account system will send a message to MQ after the operation. At this time, as long as the sending is successful, it will be submitted, and if the sending fails, it will be rolled back. How to guarantee this step depends on the transaction. Moreover, there are many distributed transactions using RocketMQ.

Tags: MQ

Posted by Danaldinho on Sun, 03 Apr 2022 05:43:54 +0930