1, Using kafka to communicate in Java
rely on
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
Sender code
public class MyKafkaProducer extends Thread{ //producer api KafkaProducer<Integer,String> producer; String topic; //theme public MyKafkaProducer(String topic) { Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.150:9092,192.168.10.151:9092,192.168.10.152:9092"); properties.put(ProducerConfig.CLIENT_ID_CONFIG,"my-producer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //Connected string //Through the factory //new producer=new KafkaProducer<Integer, String>(properties); this.topic = topic; } @Override public void run() { int num=0; while(num<20) { //Get will get the result sent //Sync get() - > future() String msg="pratice test message:"+num; try { producer.send(new ProducerRecord<Integer, String> (topic,msg)).get(); TimeUnit.SECONDS.sleep(2); num++; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } /*try { String msg="my kafka practice msg:"+num; //callback notification //asynchronous producer.send(new ProducerRecord<>(topic, msg), (metadata, exception) -> { System.out.println(metadata.offset()+"->"+metadata.partition()+"->"+metadata.topic()); }); TimeUnit.SECONDS.sleep(2); ++num; } catch (InterruptedException e) { e.printStackTrace(); }*/ } } public static void main(String[] args) { new MyKafkaProducer("test_partition").start(); } }
Consumer code
public class MyKafkaConsumer extends Thread{ KafkaConsumer<Integer,String> consumer; String topic; public MyKafkaConsumer(String topic) { Properties properties=new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.150:9092,192.168.10.151:9092,192.168.10.152:9092"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"my-consumer"); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"my-gid3"); properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"5000"); //Automatic submission (batch confirmation) properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //Consumers of a new group consume a topic properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //Consume from scratch consumer=new KafkaConsumer<Integer, String>(properties); this.topic = topic; } @Override public void run() { consumer.subscribe(Collections.singleton(this.topic)); while(true){ ConsumerRecords<Integer,String> consumerRecords=consumer.poll(Duration.ofSeconds(1)); consumerRecords.forEach(record->{ //null->my kafka practice msg:0->63 System.out.println(record.key()+"->"+record.value()+"->"+record.offset()); }); } } public static void main(String[] args) { new MyKafkaConsumer("test_partition").start(); } }
2, Asynchronous transmission
kafka can support synchronous and asynchronous message sending. In the case demonstrated above, we send messages based on synchronization.
Synchronization will need blocking, while asynchrony does not need to wait for the blocking process. In essence, kafka sends messages to the broker asynchronously, but kafka does not send messages directly to the broker every time. Instead, kafka puts the messages into a sending queue, and then continuously takes messages out of the queue through a background thread for sending. After successful sending, a callback will be triggered. kafka client will accumulate a certain amount of messages and uniformly assemble them into a batch of messages to send out. The trigger condition is batch Size and linker ms
Instead of sending through the future Get () to wait for the return result of message sending, but this method will seriously affect the performance of message sending.
//Modification of run method in sender @Override public void run() { int num=0; while(num<20) { try { String msg="my kafka practice msg:"+num; //Get will get the result sent //Sync get() - > future() //callback notification producer.send(new ProducerRecord<>(topic, msg), (metadata, exception) -> { System.out.println(metadata.offset()+"->"+metadata.partition()+"->"+metadata.topic()); }); TimeUnit.SECONDS.sleep(2); ++num; } catch (InterruptedException e) { e.printStackTrace(); } } }
batch.size
When the producer sends multiple messages to the same partition on the broker, in order to reduce the performance overhead caused by network requests, the message is submitted in batch. This parameter can be used to control the number and size of bytes submitted in batch. The default size is 16384byte, that is, 16kb, which means that when the size of a batch of messages reaches the specified batch Size will be sent uniformly
linger.ms
By default, Producer will aggregate all Requests collected in the interval between two sending, and then send them again, so as to improve the throughput, while linker MS is to add some delay for each request sent to the broker to aggregate more Message Requests. This is a bit like the Nagle algorithm in TCP. In the transmission of TCP protocol, in order to reduce the transmission of a large number of small packets, Nagle algorithm is adopted, that is, the equal stop protocol based on small packets.
batch.size and linker MS is the key parameter of kafka performance optimization. We will find batch Size and linker The functions of MS and MS are the same. If both are configured, how can they work? In fact, when both are configured, as long as one of the requirements is met, the request will be sent to the broker
3, Basic configuration analysis
group.id
consumer group is a scalable and fault-tolerant consumer mechanism provided by kafka. Since it is a group, there must be multiple consumers or consumer instances in the group, which share a common ID, that is, group ID. All consumers in the group coordinate to consume all partitions of subscribed topics. Of course, each partition can only be consumed by one consumer in the same consumption group.
As shown in the figure below, there are three consumers belonging to two different groups. For the topic of firstTopic, consumers in both groups can consume messages in the topic at the same time. For the architecture at this time, this firstTopic is similar to the topic concept in ActiveMQ.
As shown in the figure below, if all three consumers belong to the same group, firstTopic is the concept of a Queue
enable.auto.commit
The consumer will automatically submit the message after consumption. Only after the message is submitted, the message will not be received again. You can also cooperate with auto commit. interval. MS controls the frequency of automatic submission.
Of course, we can also use consumer Implement manual submission by means of commitsync()
auto.offset.reset
This parameter is for consumers in the new groupid. When consumers with the new groupid consume the specified topic, there will be different semantics for the configuration of this parameter
- auto. offset. In the case of reset = latest, the new consumer will consume the message under Topic from the offset of the last consumption of other consumers
- auto. offset. In the case of reset = early, new consumers will start consuming from the earliest message of the topic
- auto. offset. In the case of reset = none, after a new consumer joins, because there is no offset before, an exception will be thrown directly.
max.poll.records
This setting limits the number of messages returned per poll call, which makes it easier to predict the maximum value to be processed per poll interval. By adjusting this value, you can reduce the poll interval
4, Springboot+kafka
There is a comparison table between the springboot version and kafka version. If it is not introduced according to the correct version, there will be a version problem, resulting in the problem of ClassNotFound. For details, please refer to
https://spring.io/projects/spring-kafka
jar package dependency
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.4</version> </dependency>
KafkaProducer
@Component public class KafkaProducer { @Autowired private KafkaTemplate<String,String> kafkaTemplate; public void send(){ kafkaTemplate.send("test","msgKey","msgData"); } }
KafkaConsumer
@Component public class KafkaConsumer { @KafkaListener(topics = {"test"}) public void listener(ConsumerRecord record){ Optional<?> msg=Optional.ofNullable(record.value()); if(msg.isPresent()){ System.out.println(msg.get()); } } }
application configuration
spring.kafka.bootstrap.servers=192.168.10.150:9092,192.168.10.151:9092,192.168.10.152:9092 spring.kafka.producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=true spring.kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
test
public static void main(String[] args) { ConfigurableApplicationContext context=SpringApplication.run (KafkaDemoApplication.class, args); KafkaProducer kafkaProducer=context.getBean(KafkaProducer.class); for(int i=0;i<3;i++){ kafkaProducer.send(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }
5, Custom partition
MyPartition
public class MyPartition implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //Here, the partition can be returned according to the key to realize the unified entry of certain types of messages into which partition. The key can specify when sending messages // if (key == X) {...} System.out.println("enter"); // Get all partitions according to topic List<PartitionInfo> list=cluster.partitionsForTopic(topic); int leng=list.size(); // Empty randomly assign messages to partitions if(key==null){ Random random=new Random(); return random.nextInt(leng); } // Do not assign messages to empty modules return Math.abs(key.hashCode())%leng; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } public static void main(String[] args) { System.out.println(Math.abs("my-gid1".hashCode())%50); } }
Add custom partition to sender code
public MyKafkaProducer(String topic) { Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.150:9092,192.168.10.151:9092,192.168.10.152:9092"); properties.put(ProducerConfig.CLIENT_ID_CONFIG,"my-producer"); properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafka.MyPartition"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //Connected string //Through the factory //new producer=new KafkaProducer<Integer, String>(properties); this.topic = topic; }