Kafka transaction usage and programming examples / examples

1, Overview

Kafka transaction feature refers to a series of operations of producer production message and consumer submission offset. In a transaction, or an atomic operation, the production message and submission offset succeed or fail at the same time.

  • Note: kafka transactions and DB transactions.

    When understanding the transaction of message, there is always a misunderstanding that the business logic of operating db and operating message are regarded as a transaction, as shown below:

    void  kakfa_in_tranction(){
      // 1.kafa operation: read message or production message
      kafkaOperation();
      // 2.db operation
      dbOperation();
    }
    

    The data source of operating DB database is dB and the message data source is kfaka, which are completely different data. A data source (such as mysql, kafka) corresponds to a transaction, so they are two independent transactions. kafka transaction refers to a series of kafka operations such as production and consumption messages to form an atomic operation, and DB transaction refers to a series of addition, deletion and modification operations to operate the database to form an atomic operation.

2, Use of transactions

The transaction feature in Kafka is mainly used in the following two scenarios:

  • Multiple messages sent by producers can be encapsulated in a transaction to form an atomic operation. Multiple messages were sent successfully or failed.

  • Read process write mode: encapsulate message consumption and production in a transaction to form an atomic operation. In a * * streaming * * application, a service often needs to receive messages from the upstream and then send them to the downstream after processing, which corresponds to the consumption and generation of messages.

    When only exists in a transaction Consumer When consuming the operation of a message, it and Consumer Manual submission Offset No difference. Therefore, pure consumption news is not Kafka The reason for introducing transaction mechanism is that simple consumption messages do not need to exist in a transaction.
    

 

3, Transaction related API s

  • API

        /**
         * Initialize transaction
         */
        public void initTransactions();
     
        /**
         * Open transaction
         */
        public void beginTransaction() throws ProducerFencedException ;
     
        /**
         * Commit the offset that has been consumed within the transaction
         */
        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) ;
     
        /**
         * Commit transaction
         */
        public void commitTransaction() throws ProducerFencedException;
     
        /**
         * Discard transaction
         */
        public void abortTransaction() throws ProducerFencedException ;
    

 

  • Transaction configuration

    • Producer

      Need to set transactional.id Properties.
      Set transactional.id After attribute, enable.idempotence Property is automatically set to true. 
      

 

 

  • Consumer

    Need to set isolation.level = read_committed,such Consumer Only messages that have committed transactions will be read. In addition, you need to set enable.auto.commit = false To turn off automatic submission Offset Function.
    
    •  

4, Transaction usage example

  • demand

    stay Kafka of topic: ods_user There are some user data in the, and the data format is as follows:
    
    full name,Gender,date of birth
     Zhang San,1,1980-10-09
     Li Si,0,1985-11-01
    
    We need to write a program to convert the user's gender into male and female (1)-Male, 0-Female), after conversion, write the data to topic: dwd_user Yes. Transaction guarantee is required, or data is consumed and written to topic,Submit offset. Or all fail.
    
    

 

  • Console analog data

    # Create an object named ods_user and dwd_user's theme
    bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic ods_user --partitions 3 --replication-factor 2
    bin/kafka-topics.sh --create --zookeeper node-1:2181 --topic dwd_user --partitions 3 --replication-factor 2
    # Production data to ods_user
    bin/kafka-console-producer.sh --broker-list node-1:9092 --topic ods_user
    # From dwd_user consumption data
    bin/kafka-console-consumer.sh --bootstrap-server node-1:9092 --topic dwd_user --from-beginning
    

 

  • Detailed code

    public class TransUse {
        public static void main(String[] args) {
            Consumer<String, String> consumer = createConsumer();
            Producer<String, String> producer = createProduceer();
    
            // Initialize transaction
            producer.initTransactions();
    
            while(true) {
                try {
                    // 1. Start transaction
                    producer.beginTransaction();
                    // 2. Define the Map structure to save the offset corresponding to the partition
                    Map<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();
                    // 2. Pull message
                    ConsumerRecords<String, String> records = consumer.poll(2000);
                    for (ConsumerRecord<String, String> record : records) {
                        // 3. Save offset
                        offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
                                new OffsetAndMetadata(record.offset() + 1));
                        // 4. Perform conversion processing
                        String[] fields = record.value().split(",");
                        fields[1] = fields[1].equalsIgnoreCase("1") ? "male":"female";
                        String message = fields[0] + "," + fields[1] + "," + fields[2];
                        // 5. Production message to dwd_user
                        producer.send(new ProducerRecord<>("dwd_user", message));
                    }
                    // 6. Commit offset to transaction
                    producer.sendOffsetsToTransaction(offsetCommits, "ods_user");
                    // 7. Submission of services
                    producer.commitTransaction();
                } catch (Exception e) {
                    // 8. Abandonment of business
                    producer.abortTransaction();
                }
            }
        }
    
    
    
        // 1. Create consumers
        public static Consumer<String, String> createConsumer() {
            // 1. Create Kafka consumer configuration
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "node-1:9092");
            props.setProperty("group.id", "ods_user");
            props.put("isolation.level","read_committed");
            props.setProperty("enable.auto.commit", "false");
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            // 2. Create Kafka consumers
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            // 3. Subscribe to the topic to be consumed
            consumer.subscribe(Arrays.asList("ods_user"));
    
            return consumer;
        }
    
        // 2. Create producers
        public static Producer<String, String> createProduceer() {
            // 1. Create producer configuration
            Properties props = new Properties();
            props.put("bootstrap.servers", "node-1:9092");
            props.put("transactional.id", "dwd_user");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // 2. Create producers
            Producer<String, String> producer = new KafkaProducer<>(props);
            return producer;
        }
    
    }
    

 

  • Abnormal simulation

    // 3. Save offset
    offsetCommits.put(new TopicPartition(record.topic(), record.partition()),
            new OffsetAndMetadata(record.offset() + 1));
    // 4. Perform conversion processing
    String[] fields = record.value().split(",");
    fields[1] = fields[1].equalsIgnoreCase("1") ? "male":"female";
    String message = fields[0] + "," + fields[1] + "," + fields[2];
    
    // Simulation anomaly
    int i = 1/0;
    
    // 5. Production message to dwd_user
    producer.send(new ProducerRecord<>("dwd_user", message));
    
    

    We found that messages can be consumed, but if there is an exception in the middle, the offset will not be committed. Transactions will not be committed unless the consumption and production messages are successful.

Transferred from:

https://blog.csdn.net/qq_36382679/article/details/108466254

 

-------------------------------------------

1, Transaction scenario

  1. The simplest requirement is that multiple messages sent by the producer form a transaction. These messages need to be visible or invisible to the consumer at the same time.
  2. producer may send messages to multiple topic s and partition s. These messages also need to be placed in one transaction, which forms a typical distributed transaction.
  3. kafka's application scenario is often that the application consumes a topic first, then processes it, and then sends it to another topic. This consumption transform produce process needs to be put into a transaction. For example, if the message processing or sending process fails, the consumption site cannot be submitted.
  4. The producer or the application where the producer is located may hang up. After the new producer is started, you need to know how to deal with the unfinished transactions.
  5. The topology of streaming processing may be deep. If the downstream message can only be read after the upstream message transaction is committed, the rt may be very long and the throughput will decrease a lot. Therefore, two transaction isolation levels of read committed and read uncommitted need to be implemented.

 

2, Several key concepts and derivation

1. Because the message sent by the producer may be a distributed transaction, the commonly used 2PC is introduced, so there is a Transaction Coordinator. The Transaction Coordinator is similar to the Group Coordinator previously introduced to solve the problems of brain fissure and group shock in terms of election and failover.

2. Transaction log is essential in transaction management. kafka uses an internal topic to save transaction log. This design is consistent with the previous design of using internal topic to save site. The transaction log is the persistence of the state managed by the Transaction Coordinator. Because there is no need to trace back the historical state of the transaction, the transaction log only saves the latest transaction state.
3. Because there are two kinds of transaction operations: commit and abort, and the client has two isolation levels: read committed and read uncommitted, the message queue must be able to identify the transaction state, which is called Control Message.
4. If the producer hangs up, restarts or drifts to other machines, it needs to have a unique identifier to associate the previous unfinished transactions. This is TransactionalId. If one producer hangs up, another producer with the same TransactionalId can then deal with the unfinished state of the transaction. Be careful not to confuse the TransactionalId with the common transaction id in database transactions. kafka currently does not introduce global order, so there is no transaction id. this TransactionalId is configured in advance by the user.
5. TransactionalId can be associated with a producer. It is also necessary to avoid the simultaneous existence of two producers using the same TransactionalId. Therefore, producer epoch is introduced to ensure that there is only one active producer epoch corresponding to a TransactionalId

 

3, Transaction semantics

2.1. Multi partition atomic write

The transaction can guarantee the atomic write of each partition under Kafka topic. All messages in the transaction will be successfully written or discarded. For example, an exception occurs during processing and causes the transaction to terminate. In this case, the messages in the transaction will not be read by the Consumer. Now let's look at how Kafka implements the atomic "read process write" process.

First, let's consider what the atomic "read process write" cycle means. In short, this means that if an application reads message A at offset X of A topic tp0 and writes message B to topic tp1 after some processing of message A (e.g. B = F (A)), the whole reading process and writing operation is atomic only when messages A and B are considered to be successfully consumed and published together, or not published at all.

Now, only when the offset X of message A is marked as consumed, message A is considered to be consumed from topic tp0, and the consumed data offset (record offset) will be marked as submitting offset. In Kafka, we write A__ consumer_ The internal Kafka topic of offsets topic is used to record offset commit. The message is submitted only when its offset is__ consumer_offsets topic is considered successful consumption.

Since offset commit is only another write to Kafkatopic, and since the message is considered to be successfully consumed only when the offset is committed, the atomic write across multiple topics and partitions also enables the atomic "read process write" cycle: the write from commit offset X to offset topic and message B to tp1 will be part of a single transaction, so the whole step is atomic.

https://www.cnblogs.com/wangzhuxing/p/10125437.html

Tags: Transaction kafka

Posted by praveenp on Sun, 17 Apr 2022 05:15:33 +0930